2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
30 #include "lib/util/dlinklist.h"
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
36 this is the dummy null procedure that all databases support
38 static int ctdb_null_func(struct ctdb_call_info *call)
44 this is a plain fetch procedure that all databases support
46 static int ctdb_fetch_func(struct ctdb_call_info *call)
48 call->reply_data = &call->record_data;
53 this is a plain fetch procedure that all databases support
54 this returns the full record including the ltdb header
56 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
58 call->reply_data = talloc(call, TDB_DATA);
59 if (call->reply_data == NULL) {
62 call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
63 call->reply_data->dptr = talloc_size(call->reply_data, call->reply_data->dsize);
64 if (call->reply_data->dptr == NULL) {
67 memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
68 memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
75 * write a record to a normal database
77 * This is the server-variant of the ctdb_ltdb_store function.
78 * It contains logic to determine whether a record should be
79 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
80 * controls to the local ctdb daemon if apporpriate.
82 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
84 struct ctdb_ltdb_header *header,
87 struct ctdb_context *ctdb = ctdb_db->ctdb;
90 bool seqnum_suppressed = false;
92 bool schedule_for_deletion = false;
95 if (ctdb->flags & CTDB_FLAG_TORTURE) {
96 struct ctdb_ltdb_header *h2;
97 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
98 h2 = (struct ctdb_ltdb_header *)rec.dptr;
99 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
100 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
101 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
103 if (rec.dptr) free(rec.dptr);
106 if (ctdb->vnn_map == NULL) {
108 * Called from a client: always store the record
109 * Also don't call ctdb_lmaster since it uses the vnn_map!
115 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
118 * If we migrate an empty record off to another node
119 * and the record has not been migrated with data,
120 * delete the record instead of storing the empty record.
122 if (data.dsize != 0) {
124 } else if (ctdb_db->persistent) {
126 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
128 * The record is not created by the client but
129 * automatically by the ctdb_ltdb_fetch logic that
130 * creates a record with an initial header in the
131 * ltdb before trying to migrate the record from
132 * the current lmaster. Keep it instead of trying
133 * to delete the non-existing record...
136 schedule_for_deletion = true;
137 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
139 } else if (ctdb_db->ctdb->pnn == lmaster) {
141 * If we are lmaster, then we usually keep the record.
142 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
143 * and the record is empty and has never been migrated
144 * with data, then we should delete it instead of storing it.
145 * This is part of the vacuuming process.
147 * The reason that we usually need to store even empty records
148 * on the lmaster is that a client operating directly on the
149 * lmaster (== dmaster) expects the local copy of the record to
150 * exist after successful ctdb migrate call. If the record does
151 * not exist, the client goes into a migrate loop and eventually
152 * fails. So storing the empty record makes sure that we do not
153 * need to change the client code.
155 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
157 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
160 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
166 !ctdb_db->persistent &&
167 (ctdb_db->ctdb->pnn == header->dmaster))
169 schedule_for_deletion = true;
174 * The VACUUM_MIGRATED flag is only set temporarily for
175 * the above logic when the record was retrieved by a
176 * VACUUM_MIGRATE call and should not be stored in the
179 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
180 * and there are two cases in which the corresponding record
181 * is stored in the local database:
182 * 1. The record has been migrated with data in the past
183 * (the MIGRATED_WITH_DATA record flag is set).
184 * 2. The record has been filled with data again since it
185 * had been submitted in the VACUUM_FETCH message to the
187 * For such records it is important to not store the
188 * VACUUM_MIGRATED flag in the database.
190 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
193 * Similarly, clear the AUTOMATIC flag which should not enter
194 * the local database copy since this would require client
195 * modifications to clear the flag when the client stores
198 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
200 rec.dsize = sizeof(*header) + data.dsize;
201 rec.dptr = talloc_size(ctdb, rec.dsize);
202 CTDB_NO_MEMORY(ctdb, rec.dptr);
204 memcpy(rec.dptr, header, sizeof(*header));
205 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
207 /* Databases with seqnum updates enabled only get their seqnum
208 changes when/if we modify the data */
209 if (ctdb_db->seqnum_update != NULL) {
211 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
213 if ( (old.dsize == rec.dsize)
214 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
215 rec.dptr+sizeof(struct ctdb_ltdb_header),
216 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
217 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
218 seqnum_suppressed = true;
220 if (old.dptr) free(old.dptr);
223 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
225 keep?"storing":"deleting",
229 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
231 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
238 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
243 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
246 keep?"store":"delete", ret,
247 tdb_errorstr(ctdb_db->ltdb->tdb)));
249 schedule_for_deletion = false;
251 if (seqnum_suppressed) {
252 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
255 talloc_free(rec.dptr);
257 if (schedule_for_deletion) {
259 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
261 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
268 struct lock_fetch_state {
269 struct ctdb_context *ctdb;
270 void (*recv_pkt)(void *, struct ctdb_req_header *);
272 struct ctdb_req_header *hdr;
274 bool ignore_generation;
278 called when we should retry the operation
280 static void lock_fetch_callback(void *p)
282 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
283 if (!state->ignore_generation &&
284 state->generation != state->ctdb->vnn_map->generation) {
285 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
286 talloc_free(state->hdr);
289 state->recv_pkt(state->recv_context, state->hdr);
290 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
295 do a non-blocking ltdb_lock, deferring this ctdb request until we
298 It does the following:
300 1) tries to get the chainlock. If it succeeds, then it returns 0
302 2) if it fails to get a chainlock immediately then it sets up a
303 non-blocking chainlock via ctdb_lockwait, and when it gets the
304 chainlock it re-submits this ctdb request to the main packet
307 This effectively queues all ctdb requests that cannot be
308 immediately satisfied until it can get the lock. This means that
309 the main ctdb daemon will not block waiting for a chainlock held by
312 There are 3 possible return values:
314 0: means that it got the lock immediately.
315 -1: means that it failed to get the lock, and won't retry
316 -2: means that it failed to get the lock immediately, but will retry
318 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
319 TDB_DATA key, struct ctdb_req_header *hdr,
320 void (*recv_pkt)(void *, struct ctdb_req_header *),
321 void *recv_context, bool ignore_generation)
324 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
325 struct lockwait_handle *h;
326 struct lock_fetch_state *state;
328 ret = tdb_chainlock_nonblock(tdb, key);
331 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
332 /* a hard failure - don't try again */
336 /* when torturing, ensure we test the contended path */
337 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
340 tdb_chainunlock(tdb, key);
343 /* first the non-contended path */
348 state = talloc(hdr, struct lock_fetch_state);
349 state->ctdb = ctdb_db->ctdb;
351 state->recv_pkt = recv_pkt;
352 state->recv_context = recv_context;
353 state->generation = ctdb_db->ctdb->vnn_map->generation;
354 state->ignore_generation = ignore_generation;
356 /* now the contended path */
357 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
362 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
363 so it won't be freed yet */
364 talloc_steal(state, hdr);
365 talloc_steal(state, h);
367 /* now tell the caller than we will retry asynchronously */
372 a varient of ctdb_ltdb_lock_requeue that also fetches the record
374 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
375 TDB_DATA key, struct ctdb_ltdb_header *header,
376 struct ctdb_req_header *hdr, TDB_DATA *data,
377 void (*recv_pkt)(void *, struct ctdb_req_header *),
378 void *recv_context, bool ignore_generation)
382 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
383 recv_context, ignore_generation);
385 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
388 uret = ctdb_ltdb_unlock(ctdb_db, key);
390 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
399 paraoid check to see if the db is empty
401 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
403 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
404 int count = tdb_traverse_read(tdb, NULL, NULL);
406 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
408 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
412 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
413 struct ctdb_db_context *ctdb_db)
415 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
421 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
422 key.dsize = strlen(ctdb_db->db_name);
424 old = ctdb_db->unhealthy_reason;
425 ctdb_db->unhealthy_reason = NULL;
427 val = tdb_fetch(tdb, key);
429 reason = talloc_strndup(ctdb_db,
430 (const char *)val.dptr,
432 if (reason == NULL) {
433 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
435 ctdb_db->unhealthy_reason = old;
446 ctdb_db->unhealthy_reason = reason;
450 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
451 struct ctdb_db_context *ctdb_db,
452 const char *given_reason,/* NULL means healthy */
453 int num_healthy_nodes)
455 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
459 char *new_reason = NULL;
460 char *old_reason = NULL;
462 ret = tdb_transaction_start(tdb);
464 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
465 tdb_name(tdb), ret, tdb_errorstr(tdb)));
469 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
471 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
472 ctdb_db->db_name, ret));
475 old_reason = ctdb_db->unhealthy_reason;
477 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
478 key.dsize = strlen(ctdb_db->db_name);
481 new_reason = talloc_strdup(ctdb_db, given_reason);
482 if (new_reason == NULL) {
483 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
487 } else if (old_reason && num_healthy_nodes == 0) {
489 * If the reason indicates ok, but there where no healthy nodes
490 * available, that it means, we have not recovered valid content
491 * of the db. So if there's an old reason, prefix it with
492 * "NO-HEALTHY-NODES - "
496 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
497 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
499 prefix = _TMP_PREFIX;
503 new_reason = talloc_asprintf(ctdb_db, "%s%s",
505 if (new_reason == NULL) {
506 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
507 prefix, old_reason));
514 val.dptr = discard_const_p(uint8_t, new_reason);
515 val.dsize = strlen(new_reason);
517 ret = tdb_store(tdb, key, val, TDB_REPLACE);
519 tdb_transaction_cancel(tdb);
520 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
521 tdb_name(tdb), ctdb_db->db_name, new_reason,
522 ret, tdb_errorstr(tdb)));
523 talloc_free(new_reason);
526 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
527 ctdb_db->db_name, new_reason));
528 } else if (old_reason) {
529 ret = tdb_delete(tdb, key);
531 tdb_transaction_cancel(tdb);
532 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
533 tdb_name(tdb), ctdb_db->db_name,
534 ret, tdb_errorstr(tdb)));
535 talloc_free(new_reason);
538 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
542 ret = tdb_transaction_commit(tdb);
543 if (ret != TDB_SUCCESS) {
544 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
545 tdb_name(tdb), ret, tdb_errorstr(tdb)));
546 talloc_free(new_reason);
550 talloc_free(old_reason);
551 ctdb_db->unhealthy_reason = new_reason;
556 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
557 struct ctdb_db_context *ctdb_db)
559 time_t now = time(NULL);
567 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
568 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
569 "%04u%02u%02u%02u%02u%02u.0Z",
571 tm->tm_year+1900, tm->tm_mon+1,
572 tm->tm_mday, tm->tm_hour, tm->tm_min,
574 if (new_path == NULL) {
575 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
579 new_reason = talloc_asprintf(ctdb_db,
580 "ERROR - Backup of corrupted TDB in '%s'",
582 if (new_reason == NULL) {
583 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
586 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
587 talloc_free(new_reason);
589 DEBUG(DEBUG_CRIT,(__location__
590 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
595 ret = rename(ctdb_db->db_path, new_path);
597 DEBUG(DEBUG_CRIT,(__location__
598 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
599 ctdb_db->db_path, new_path,
600 errno, strerror(errno)));
601 talloc_free(new_path);
605 DEBUG(DEBUG_CRIT,(__location__
606 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
607 ctdb_db->db_path, new_path));
608 talloc_free(new_path);
612 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
614 struct ctdb_db_context *ctdb_db;
619 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
620 if (!ctdb_db->persistent) {
624 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
626 DEBUG(DEBUG_ALERT,(__location__
627 " load persistent health for '%s' failed\n",
632 if (ctdb_db->unhealthy_reason == NULL) {
634 DEBUG(DEBUG_INFO,(__location__
635 " persistent db '%s' healthy\n",
641 DEBUG(DEBUG_ALERT,(__location__
642 " persistent db '%s' unhealthy: %s\n",
644 ctdb_db->unhealthy_reason));
646 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
647 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
659 mark a database - as healthy
661 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
663 uint32_t db_id = *(uint32_t *)indata.dptr;
664 struct ctdb_db_context *ctdb_db;
666 bool may_recover = false;
668 ctdb_db = find_ctdb_db(ctdb, db_id);
670 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
674 if (ctdb_db->unhealthy_reason) {
678 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
680 DEBUG(DEBUG_ERR,(__location__
681 " ctdb_update_persistent_health(%s) failed\n",
686 if (may_recover && !ctdb->done_startup) {
687 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
689 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
695 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
699 uint32_t db_id = *(uint32_t *)indata.dptr;
700 struct ctdb_db_context *ctdb_db;
703 ctdb_db = find_ctdb_db(ctdb, db_id);
705 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
709 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
711 DEBUG(DEBUG_ERR,(__location__
712 " ctdb_load_persistent_health(%s) failed\n",
718 if (ctdb_db->unhealthy_reason) {
719 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
720 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
727 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
731 if (ctdb_db->readonly) {
735 if (ctdb_db->persistent) {
736 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
740 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
741 if (ropath == NULL) {
742 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
745 ctdb_db->rottdb = tdb_open(ropath,
746 ctdb->tunable.database_hash_size,
747 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
749 if (ctdb_db->rottdb == NULL) {
750 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
755 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
757 ctdb_db->readonly = true;
763 attach to a database, handling both persistent and non-persistent databases
764 return 0 on success, -1 on failure
766 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
767 bool persistent, const char *unhealthy_reason,
770 struct ctdb_db_context *ctdb_db, *tmp_db;
775 int remaining_tries = 0;
777 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
778 CTDB_NO_MEMORY(ctdb, ctdb_db);
780 ctdb_db->priority = 1;
781 ctdb_db->ctdb = ctdb;
782 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
783 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
785 key.dsize = strlen(db_name)+1;
786 key.dptr = discard_const(db_name);
787 ctdb_db->db_id = ctdb_hash(&key);
788 ctdb_db->persistent = persistent;
790 if (!ctdb_db->persistent) {
791 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
792 if (ctdb_db->delete_queue == NULL) {
793 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
796 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
799 /* check for hash collisions */
800 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
801 if (tmp_db->db_id == ctdb_db->db_id) {
802 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
803 tmp_db->db_id, db_name, tmp_db->db_name));
804 talloc_free(ctdb_db);
810 if (unhealthy_reason) {
811 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
812 unhealthy_reason, 0);
814 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
815 ctdb_db->db_name, unhealthy_reason, ret));
816 talloc_free(ctdb_db);
821 if (ctdb->max_persistent_check_errors > 0) {
824 if (ctdb->done_startup) {
828 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
830 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
831 ctdb_db->db_name, ret));
832 talloc_free(ctdb_db);
837 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
838 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
839 ctdb_db->db_name, ctdb_db->unhealthy_reason));
840 talloc_free(ctdb_db);
844 if (ctdb_db->unhealthy_reason) {
845 /* this is just a warning, but we want that in the log file! */
846 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
847 ctdb_db->db_name, ctdb_db->unhealthy_reason));
850 /* open the database */
851 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
852 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
855 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
856 if (ctdb->valgrinding) {
857 tdb_flags |= TDB_NOMMAP;
859 tdb_flags |= TDB_DISALLOW_NESTING;
861 tdb_flags |= TDB_INCOMPATIBLE_HASH;
865 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
866 ctdb->tunable.database_hash_size,
868 O_CREAT|O_RDWR, mode);
869 if (ctdb_db->ltdb == NULL) {
871 int saved_errno = errno;
874 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
877 strerror(saved_errno)));
878 talloc_free(ctdb_db);
882 if (remaining_tries == 0) {
883 DEBUG(DEBUG_CRIT,(__location__
884 "Failed to open persistent tdb '%s': %d - %s\n",
887 strerror(saved_errno)));
888 talloc_free(ctdb_db);
892 ret = stat(ctdb_db->db_path, &st);
894 DEBUG(DEBUG_CRIT,(__location__
895 "Failed to open persistent tdb '%s': %d - %s\n",
898 strerror(saved_errno)));
899 talloc_free(ctdb_db);
903 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
905 DEBUG(DEBUG_CRIT,(__location__
906 "Failed to open persistent tdb '%s': %d - %s\n",
909 strerror(saved_errno)));
910 talloc_free(ctdb_db);
920 ctdb_check_db_empty(ctdb_db);
922 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
927 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
928 ctdb_db->db_path, ret,
929 tdb_errorstr(ctdb_db->ltdb->tdb)));
930 if (remaining_tries == 0) {
931 talloc_free(ctdb_db);
935 fd = tdb_fd(ctdb_db->ltdb->tdb);
936 ret = fstat(fd, &st);
938 DEBUG(DEBUG_CRIT,(__location__
939 "Failed to fstat() persistent tdb '%s': %d - %s\n",
943 talloc_free(ctdb_db);
948 talloc_free(ctdb_db->ltdb);
949 ctdb_db->ltdb = NULL;
951 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
953 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
955 talloc_free(ctdb_db);
965 /* set up a rb tree we can use to track which records we have a
966 fetch-lock in-flight for so we can defer any additional calls
969 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
970 if (ctdb_db->deferred_fetch == NULL) {
971 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
972 talloc_free(ctdb_db);
976 DLIST_ADD(ctdb->db_list, ctdb_db);
978 /* setting this can help some high churn databases */
979 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
982 all databases support the "null" function. we need this in
983 order to do forced migration of records
985 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
987 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
988 talloc_free(ctdb_db);
993 all databases support the "fetch" function. we need this
994 for efficient Samba3 ctdb fetch
996 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
998 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
999 talloc_free(ctdb_db);
1004 all databases support the "fetch_with_header" function. we need this
1005 for efficient readonly record fetches
1007 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1009 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1010 talloc_free(ctdb_db);
1014 ret = ctdb_vacuum_init(ctdb_db);
1016 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1017 "database '%s'\n", ctdb_db->db_name));
1018 talloc_free(ctdb_db);
1023 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
1030 struct ctdb_deferred_attach_context {
1031 struct ctdb_deferred_attach_context *next, *prev;
1032 struct ctdb_context *ctdb;
1033 struct ctdb_req_control *c;
1037 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1039 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1044 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1046 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1047 struct ctdb_context *ctdb = da_ctx->ctdb;
1049 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1050 talloc_free(da_ctx);
1053 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1055 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1056 struct ctdb_context *ctdb = da_ctx->ctdb;
1058 /* This talloc-steals the packet ->c */
1059 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1060 talloc_free(da_ctx);
1063 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1065 struct ctdb_deferred_attach_context *da_ctx;
1067 /* call it from the main event loop as soon as the current event
1070 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1071 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1072 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1079 a client has asked to attach a new database
1081 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1082 TDB_DATA *outdata, uint64_t tdb_flags,
1083 bool persistent, uint32_t client_id,
1084 struct ctdb_req_control *c,
1087 const char *db_name = (const char *)indata.dptr;
1088 struct ctdb_db_context *db;
1089 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1090 struct ctdb_client *client = NULL;
1092 if (ctdb->tunable.allow_client_db_attach == 0) {
1093 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1094 "AllowClientDBAccess == 0\n", db_name));
1098 /* dont allow any local clients to attach while we are in recovery mode
1099 * except for the recovery daemon.
1100 * allow all attach from the network since these are always from remote
1103 if (client_id != 0) {
1104 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1106 if (client != NULL) {
1107 /* If the node is inactive it is not part of the cluster
1108 and we should not allow clients to attach to any
1111 if (node->flags & NODE_FLAGS_INACTIVE) {
1112 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1116 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1117 && client->pid != ctdb->recoverd_pid
1118 && !ctdb->done_startup) {
1119 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1121 if (da_ctx == NULL) {
1122 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1126 da_ctx->ctdb = ctdb;
1127 da_ctx->c = talloc_steal(da_ctx, c);
1128 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1129 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1131 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1133 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1134 *async_reply = true;
1139 /* the client can optionally pass additional tdb flags, but we
1140 only allow a subset of those on the database in ctdb. Note
1141 that tdb_flags is passed in via the (otherwise unused)
1142 srvid to the attach control */
1143 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1145 /* see if we already have this name */
1146 db = ctdb_db_handle(ctdb, db_name);
1148 outdata->dptr = (uint8_t *)&db->db_id;
1149 outdata->dsize = sizeof(db->db_id);
1150 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1154 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1158 db = ctdb_db_handle(ctdb, db_name);
1160 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1164 /* remember the flags the client has specified */
1165 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1167 outdata->dptr = (uint8_t *)&db->db_id;
1168 outdata->dsize = sizeof(db->db_id);
1170 /* Try to ensure it's locked in mem */
1171 ctdb_lockdown_memory(ctdb);
1173 /* tell all the other nodes about this database */
1174 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1175 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1176 CTDB_CONTROL_DB_ATTACH,
1177 0, CTDB_CTRL_FLAG_NOREPLY,
1178 indata, NULL, NULL);
1186 attach to all existing persistent databases
1188 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1189 const char *unhealthy_reason)
1194 /* open the persistent db directory and scan it for files */
1195 d = opendir(ctdb->db_directory_persistent);
1200 while ((de=readdir(d))) {
1202 size_t len = strlen(de->d_name);
1204 int invalid_name = 0;
1206 s = talloc_strdup(ctdb, de->d_name);
1207 CTDB_NO_MEMORY(ctdb, s);
1209 /* only accept names ending in .tdb */
1210 p = strstr(s, ".tdb.");
1211 if (len < 7 || p == NULL) {
1216 /* only accept names ending with .tdb. and any number of digits */
1218 while (*q != 0 && invalid_name == 0) {
1219 if (!isdigit(*q++)) {
1223 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1224 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1230 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1231 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1237 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1245 int ctdb_attach_databases(struct ctdb_context *ctdb)
1248 char *persistent_health_path = NULL;
1249 char *unhealthy_reason = NULL;
1250 bool first_try = true;
1252 if (ctdb->db_directory == NULL) {
1253 ctdb->db_directory = VARDIR "/ctdb";
1255 if (ctdb->db_directory_persistent == NULL) {
1256 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1258 if (ctdb->db_directory_state == NULL) {
1259 ctdb->db_directory_state = VARDIR "/ctdb/state";
1262 /* make sure the db directory exists */
1263 ret = mkdir(ctdb->db_directory, 0700);
1264 if (ret == -1 && errno != EEXIST) {
1265 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1266 ctdb->db_directory));
1270 /* make sure the persistent db directory exists */
1271 ret = mkdir(ctdb->db_directory_persistent, 0700);
1272 if (ret == -1 && errno != EEXIST) {
1273 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1274 ctdb->db_directory_persistent));
1278 /* make sure the internal state db directory exists */
1279 ret = mkdir(ctdb->db_directory_state, 0700);
1280 if (ret == -1 && errno != EEXIST) {
1281 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1282 ctdb->db_directory_state));
1286 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1287 ctdb->db_directory_state,
1288 PERSISTENT_HEALTH_TDB,
1290 if (persistent_health_path == NULL) {
1291 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1297 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1298 0, TDB_DISALLOW_NESTING,
1299 O_CREAT | O_RDWR, 0600);
1300 if (ctdb->db_persistent_health == NULL) {
1301 struct tdb_wrap *tdb;
1304 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1305 persistent_health_path,
1308 talloc_free(persistent_health_path);
1309 talloc_free(unhealthy_reason);
1314 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1315 persistent_health_path,
1316 "was cleared after a failure",
1317 "manual verification needed");
1318 if (unhealthy_reason == NULL) {
1319 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1320 talloc_free(persistent_health_path);
1324 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1325 persistent_health_path));
1326 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1327 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1328 O_CREAT | O_RDWR, 0600);
1330 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1331 persistent_health_path,
1334 talloc_free(persistent_health_path);
1335 talloc_free(unhealthy_reason);
1342 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1344 struct tdb_wrap *tdb;
1346 talloc_free(ctdb->db_persistent_health);
1347 ctdb->db_persistent_health = NULL;
1350 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1351 persistent_health_path));
1352 talloc_free(persistent_health_path);
1353 talloc_free(unhealthy_reason);
1358 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1359 persistent_health_path,
1360 "was cleared after a failure",
1361 "manual verification needed");
1362 if (unhealthy_reason == NULL) {
1363 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1364 talloc_free(persistent_health_path);
1368 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1369 persistent_health_path));
1370 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1371 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1372 O_CREAT | O_RDWR, 0600);
1374 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1375 persistent_health_path,
1378 talloc_free(persistent_health_path);
1379 talloc_free(unhealthy_reason);
1386 talloc_free(persistent_health_path);
1388 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1389 talloc_free(unhealthy_reason);
1398 called when a broadcast seqnum update comes in
1400 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1402 struct ctdb_db_context *ctdb_db;
1403 if (srcnode == ctdb->pnn) {
1404 /* don't update ourselves! */
1408 ctdb_db = find_ctdb_db(ctdb, db_id);
1410 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1414 if (ctdb_db->unhealthy_reason) {
1415 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1416 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1420 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1421 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1426 timer to check for seqnum changes in a ltdb and propogate them
1428 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1429 struct timeval t, void *p)
1431 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1432 struct ctdb_context *ctdb = ctdb_db->ctdb;
1433 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1434 if (new_seqnum != ctdb_db->seqnum) {
1435 /* something has changed - propogate it */
1437 data.dptr = (uint8_t *)&ctdb_db->db_id;
1438 data.dsize = sizeof(uint32_t);
1439 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1440 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1443 ctdb_db->seqnum = new_seqnum;
1445 /* setup a new timer */
1446 ctdb_db->seqnum_update =
1447 event_add_timed(ctdb->ev, ctdb_db,
1448 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1449 ctdb_ltdb_seqnum_check, ctdb_db);
1453 enable seqnum handling on this db
1455 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1457 struct ctdb_db_context *ctdb_db;
1458 ctdb_db = find_ctdb_db(ctdb, db_id);
1460 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1464 if (ctdb_db->seqnum_update == NULL) {
1465 ctdb_db->seqnum_update =
1466 event_add_timed(ctdb->ev, ctdb_db,
1467 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1468 ctdb_ltdb_seqnum_check, ctdb_db);
1471 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1472 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1476 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1478 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1479 struct ctdb_db_context *ctdb_db;
1481 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1483 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1487 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1488 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1492 ctdb_db->priority = db_prio->priority;
1493 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));