2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
47 * write a record to a normal database
49 * This is the server-variant of the ctdb_ltdb_store function.
50 * It contains logic to determine whether a record should be
51 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52 * controls to the local ctdb daemon if apporpriate.
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
56 struct ctdb_ltdb_header *header,
59 struct ctdb_context *ctdb = ctdb_db->ctdb;
61 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
63 bool seqnum_suppressed = false;
65 bool schedule_for_deletion = false;
66 bool remove_from_delete_queue = false;
69 if (ctdb->flags & CTDB_FLAG_TORTURE) {
71 struct ctdb_ltdb_header *h2;
73 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74 h2 = (struct ctdb_ltdb_header *)old.dptr;
75 if (old.dptr != NULL &&
77 h2->rsn > header->rsn) {
79 ("RSN regression! %"PRIu64" %"PRIu64"\n",
80 h2->rsn, header->rsn));
87 if (ctdb->vnn_map == NULL) {
89 * Called from a client: always store the record
90 * Also don't call ctdb_lmaster since it uses the vnn_map!
96 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
99 * If we migrate an empty record off to another node
100 * and the record has not been migrated with data,
101 * delete the record instead of storing the empty record.
103 if (data.dsize != 0) {
105 } else if (header->flags & CTDB_REC_RO_FLAGS) {
107 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
109 * The record is not created by the client but
110 * automatically by the ctdb_ltdb_fetch logic that
111 * creates a record with an initial header in the
112 * ltdb before trying to migrate the record from
113 * the current lmaster. Keep it instead of trying
114 * to delete the non-existing record...
117 schedule_for_deletion = true;
118 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
120 } else if (ctdb_db->ctdb->pnn == lmaster) {
122 * If we are lmaster, then we usually keep the record.
123 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
124 * and the record is empty and has never been migrated
125 * with data, then we should delete it instead of storing it.
126 * This is part of the vacuuming process.
128 * The reason that we usually need to store even empty records
129 * on the lmaster is that a client operating directly on the
130 * lmaster (== dmaster) expects the local copy of the record to
131 * exist after successful ctdb migrate call. If the record does
132 * not exist, the client goes into a migrate loop and eventually
133 * fails. So storing the empty record makes sure that we do not
134 * need to change the client code.
136 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
138 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
141 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
146 if (!ctdb_db->persistent &&
147 (ctdb_db->ctdb->pnn == header->dmaster) &&
148 !(header->flags & CTDB_REC_RO_FLAGS))
152 if (data.dsize == 0) {
153 schedule_for_deletion = true;
156 remove_from_delete_queue = !schedule_for_deletion;
161 * The VACUUM_MIGRATED flag is only set temporarily for
162 * the above logic when the record was retrieved by a
163 * VACUUM_MIGRATE call and should not be stored in the
166 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
167 * and there are two cases in which the corresponding record
168 * is stored in the local database:
169 * 1. The record has been migrated with data in the past
170 * (the MIGRATED_WITH_DATA record flag is set).
171 * 2. The record has been filled with data again since it
172 * had been submitted in the VACUUM_FETCH message to the
174 * For such records it is important to not store the
175 * VACUUM_MIGRATED flag in the database.
177 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
180 * Similarly, clear the AUTOMATIC flag which should not enter
181 * the local database copy since this would require client
182 * modifications to clear the flag when the client stores
185 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
187 rec[0].dsize = hsize;
188 rec[0].dptr = (uint8_t *)header;
190 rec[1].dsize = data.dsize;
191 rec[1].dptr = data.dptr;
193 /* Databases with seqnum updates enabled only get their seqnum
194 changes when/if we modify the data */
195 if (ctdb_db->seqnum_update != NULL) {
197 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
199 if ((old.dsize == hsize + data.dsize) &&
200 memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
201 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
202 seqnum_suppressed = true;
204 if (old.dptr != NULL) {
209 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
211 keep?"storing":"deleting",
215 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
217 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
224 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
229 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
232 keep?"store":"delete", ret,
233 tdb_errorstr(ctdb_db->ltdb->tdb)));
235 schedule_for_deletion = false;
236 remove_from_delete_queue = false;
238 if (seqnum_suppressed) {
239 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
242 if (schedule_for_deletion) {
244 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
246 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
250 if (remove_from_delete_queue) {
251 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
257 struct lock_fetch_state {
258 struct ctdb_context *ctdb;
259 struct ctdb_db_context *ctdb_db;
260 void (*recv_pkt)(void *, struct ctdb_req_header *);
262 struct ctdb_req_header *hdr;
264 bool ignore_generation;
268 called when we should retry the operation
270 static void lock_fetch_callback(void *p, bool locked)
272 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
273 if (!state->ignore_generation &&
274 state->generation != state->ctdb_db->generation) {
275 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
276 talloc_free(state->hdr);
279 state->recv_pkt(state->recv_context, state->hdr);
280 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
285 do a non-blocking ltdb_lock, deferring this ctdb request until we
288 It does the following:
290 1) tries to get the chainlock. If it succeeds, then it returns 0
292 2) if it fails to get a chainlock immediately then it sets up a
293 non-blocking chainlock via ctdb_lock_record, and when it gets the
294 chainlock it re-submits this ctdb request to the main packet
297 This effectively queues all ctdb requests that cannot be
298 immediately satisfied until it can get the lock. This means that
299 the main ctdb daemon will not block waiting for a chainlock held by
302 There are 3 possible return values:
304 0: means that it got the lock immediately.
305 -1: means that it failed to get the lock, and won't retry
306 -2: means that it failed to get the lock immediately, but will retry
308 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
309 TDB_DATA key, struct ctdb_req_header *hdr,
310 void (*recv_pkt)(void *, struct ctdb_req_header *),
311 void *recv_context, bool ignore_generation)
314 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
315 struct lock_request *lreq;
316 struct lock_fetch_state *state;
318 ret = tdb_chainlock_nonblock(tdb, key);
321 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
322 /* a hard failure - don't try again */
326 /* when torturing, ensure we test the contended path */
327 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
330 tdb_chainunlock(tdb, key);
333 /* first the non-contended path */
338 state = talloc(hdr, struct lock_fetch_state);
339 state->ctdb = ctdb_db->ctdb;
340 state->ctdb_db = ctdb_db;
342 state->recv_pkt = recv_pkt;
343 state->recv_context = recv_context;
344 state->generation = ctdb_db->generation;
345 state->ignore_generation = ignore_generation;
347 /* now the contended path */
348 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
353 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
354 so it won't be freed yet */
355 talloc_steal(state, hdr);
357 /* now tell the caller than we will retry asynchronously */
362 a varient of ctdb_ltdb_lock_requeue that also fetches the record
364 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
365 TDB_DATA key, struct ctdb_ltdb_header *header,
366 struct ctdb_req_header *hdr, TDB_DATA *data,
367 void (*recv_pkt)(void *, struct ctdb_req_header *),
368 void *recv_context, bool ignore_generation)
372 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
373 recv_context, ignore_generation);
375 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
378 uret = ctdb_ltdb_unlock(ctdb_db, key);
380 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
389 paraoid check to see if the db is empty
391 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
393 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
394 int count = tdb_traverse_read(tdb, NULL, NULL);
396 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
398 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
402 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
403 struct ctdb_db_context *ctdb_db)
405 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
411 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
412 key.dsize = strlen(ctdb_db->db_name);
414 old = ctdb_db->unhealthy_reason;
415 ctdb_db->unhealthy_reason = NULL;
417 val = tdb_fetch(tdb, key);
419 reason = talloc_strndup(ctdb_db,
420 (const char *)val.dptr,
422 if (reason == NULL) {
423 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
425 ctdb_db->unhealthy_reason = old;
436 ctdb_db->unhealthy_reason = reason;
440 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
441 struct ctdb_db_context *ctdb_db,
442 const char *given_reason,/* NULL means healthy */
443 int num_healthy_nodes)
445 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
449 char *new_reason = NULL;
450 char *old_reason = NULL;
452 ret = tdb_transaction_start(tdb);
454 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
455 tdb_name(tdb), ret, tdb_errorstr(tdb)));
459 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
461 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
462 ctdb_db->db_name, ret));
465 old_reason = ctdb_db->unhealthy_reason;
467 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
468 key.dsize = strlen(ctdb_db->db_name);
471 new_reason = talloc_strdup(ctdb_db, given_reason);
472 if (new_reason == NULL) {
473 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
477 } else if (old_reason && num_healthy_nodes == 0) {
479 * If the reason indicates ok, but there where no healthy nodes
480 * available, that it means, we have not recovered valid content
481 * of the db. So if there's an old reason, prefix it with
482 * "NO-HEALTHY-NODES - "
486 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
487 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
489 prefix = _TMP_PREFIX;
493 new_reason = talloc_asprintf(ctdb_db, "%s%s",
495 if (new_reason == NULL) {
496 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
497 prefix, old_reason));
504 val.dptr = discard_const_p(uint8_t, new_reason);
505 val.dsize = strlen(new_reason);
507 ret = tdb_store(tdb, key, val, TDB_REPLACE);
509 tdb_transaction_cancel(tdb);
510 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
511 tdb_name(tdb), ctdb_db->db_name, new_reason,
512 ret, tdb_errorstr(tdb)));
513 talloc_free(new_reason);
516 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
517 ctdb_db->db_name, new_reason));
518 } else if (old_reason) {
519 ret = tdb_delete(tdb, key);
521 tdb_transaction_cancel(tdb);
522 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
523 tdb_name(tdb), ctdb_db->db_name,
524 ret, tdb_errorstr(tdb)));
525 talloc_free(new_reason);
528 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
532 ret = tdb_transaction_commit(tdb);
533 if (ret != TDB_SUCCESS) {
534 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
535 tdb_name(tdb), ret, tdb_errorstr(tdb)));
536 talloc_free(new_reason);
540 talloc_free(old_reason);
541 ctdb_db->unhealthy_reason = new_reason;
546 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
547 struct ctdb_db_context *ctdb_db)
549 time_t now = time(NULL);
557 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
558 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
559 "%04u%02u%02u%02u%02u%02u.0Z",
561 tm->tm_year+1900, tm->tm_mon+1,
562 tm->tm_mday, tm->tm_hour, tm->tm_min,
564 if (new_path == NULL) {
565 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
569 new_reason = talloc_asprintf(ctdb_db,
570 "ERROR - Backup of corrupted TDB in '%s'",
572 if (new_reason == NULL) {
573 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
576 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
577 talloc_free(new_reason);
579 DEBUG(DEBUG_CRIT,(__location__
580 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
585 ret = rename(ctdb_db->db_path, new_path);
587 DEBUG(DEBUG_CRIT,(__location__
588 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
589 ctdb_db->db_path, new_path,
590 errno, strerror(errno)));
591 talloc_free(new_path);
595 DEBUG(DEBUG_CRIT,(__location__
596 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
597 ctdb_db->db_path, new_path));
598 talloc_free(new_path);
602 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
604 struct ctdb_db_context *ctdb_db;
609 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
610 if (!ctdb_db->persistent) {
614 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
616 DEBUG(DEBUG_ALERT,(__location__
617 " load persistent health for '%s' failed\n",
622 if (ctdb_db->unhealthy_reason == NULL) {
624 DEBUG(DEBUG_INFO,(__location__
625 " persistent db '%s' healthy\n",
631 DEBUG(DEBUG_ALERT,(__location__
632 " persistent db '%s' unhealthy: %s\n",
634 ctdb_db->unhealthy_reason));
637 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
649 mark a database - as healthy
651 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
653 uint32_t db_id = *(uint32_t *)indata.dptr;
654 struct ctdb_db_context *ctdb_db;
656 bool may_recover = false;
658 ctdb_db = find_ctdb_db(ctdb, db_id);
660 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
664 if (ctdb_db->unhealthy_reason) {
668 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
670 DEBUG(DEBUG_ERR,(__location__
671 " ctdb_update_persistent_health(%s) failed\n",
676 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
677 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
679 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
685 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
689 uint32_t db_id = *(uint32_t *)indata.dptr;
690 struct ctdb_db_context *ctdb_db;
693 ctdb_db = find_ctdb_db(ctdb, db_id);
695 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
699 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
701 DEBUG(DEBUG_ERR,(__location__
702 " ctdb_load_persistent_health(%s) failed\n",
708 if (ctdb_db->unhealthy_reason) {
709 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
710 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
717 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
721 if (ctdb_db->readonly) {
725 if (ctdb_db->persistent) {
726 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
730 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
731 if (ropath == NULL) {
732 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
735 ctdb_db->rottdb = tdb_open(ropath,
736 ctdb->tunable.database_hash_size,
737 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
738 O_CREAT|O_RDWR, 0600);
739 if (ctdb_db->rottdb == NULL) {
740 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
745 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
747 ctdb_db->readonly = true;
749 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
756 attach to a database, handling both persistent and non-persistent databases
757 return 0 on success, -1 on failure
759 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
760 bool persistent, const char *unhealthy_reason,
761 bool jenkinshash, bool mutexes)
763 struct ctdb_db_context *ctdb_db, *tmp_db;
768 int remaining_tries = 0;
770 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
771 CTDB_NO_MEMORY(ctdb, ctdb_db);
773 ctdb_db->ctdb = ctdb;
774 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
775 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
777 key.dsize = strlen(db_name)+1;
778 key.dptr = discard_const(db_name);
779 ctdb_db->db_id = ctdb_hash(&key);
780 ctdb_db->persistent = persistent;
782 if (!ctdb_db->persistent) {
783 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
784 if (ctdb_db->delete_queue == NULL) {
785 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
788 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
791 /* check for hash collisions */
792 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
793 if (tmp_db->db_id == ctdb_db->db_id) {
794 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
795 tmp_db->db_id, db_name, tmp_db->db_name));
796 talloc_free(ctdb_db);
802 if (unhealthy_reason) {
803 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
804 unhealthy_reason, 0);
806 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
807 ctdb_db->db_name, unhealthy_reason, ret));
808 talloc_free(ctdb_db);
813 if (ctdb->max_persistent_check_errors > 0) {
816 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
820 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
822 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
823 ctdb_db->db_name, ret));
824 talloc_free(ctdb_db);
829 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
830 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
831 ctdb_db->db_name, ctdb_db->unhealthy_reason));
832 talloc_free(ctdb_db);
836 if (ctdb_db->unhealthy_reason) {
837 /* this is just a warning, but we want that in the log file! */
838 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
839 ctdb_db->db_name, ctdb_db->unhealthy_reason));
842 /* open the database */
843 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
844 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
847 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
848 if (ctdb->valgrinding) {
849 tdb_flags |= TDB_NOMMAP;
851 tdb_flags |= TDB_DISALLOW_NESTING;
853 tdb_flags |= TDB_INCOMPATIBLE_HASH;
855 #ifdef TDB_MUTEX_LOCKING
856 if (ctdb->tunable.mutex_enabled && mutexes &&
857 tdb_runtime_check_for_robust_mutexes()) {
858 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
863 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
864 ctdb->tunable.database_hash_size,
866 O_CREAT|O_RDWR, mode);
867 if (ctdb_db->ltdb == NULL) {
869 int saved_errno = errno;
872 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
875 strerror(saved_errno)));
876 talloc_free(ctdb_db);
880 if (remaining_tries == 0) {
881 DEBUG(DEBUG_CRIT,(__location__
882 "Failed to open persistent tdb '%s': %d - %s\n",
885 strerror(saved_errno)));
886 talloc_free(ctdb_db);
890 ret = stat(ctdb_db->db_path, &st);
892 DEBUG(DEBUG_CRIT,(__location__
893 "Failed to open persistent tdb '%s': %d - %s\n",
896 strerror(saved_errno)));
897 talloc_free(ctdb_db);
901 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
903 DEBUG(DEBUG_CRIT,(__location__
904 "Failed to open persistent tdb '%s': %d - %s\n",
907 strerror(saved_errno)));
908 talloc_free(ctdb_db);
918 ctdb_check_db_empty(ctdb_db);
920 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
925 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
926 ctdb_db->db_path, ret,
927 tdb_errorstr(ctdb_db->ltdb->tdb)));
928 if (remaining_tries == 0) {
929 talloc_free(ctdb_db);
933 fd = tdb_fd(ctdb_db->ltdb->tdb);
934 ret = fstat(fd, &st);
936 DEBUG(DEBUG_CRIT,(__location__
937 "Failed to fstat() persistent tdb '%s': %d - %s\n",
941 talloc_free(ctdb_db);
946 talloc_free(ctdb_db->ltdb);
947 ctdb_db->ltdb = NULL;
949 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
951 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
953 talloc_free(ctdb_db);
963 /* set up a rb tree we can use to track which records we have a
964 fetch-lock in-flight for so we can defer any additional calls
967 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
968 if (ctdb_db->deferred_fetch == NULL) {
969 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
970 talloc_free(ctdb_db);
974 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
975 if (ctdb_db->defer_dmaster == NULL) {
976 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
978 talloc_free(ctdb_db);
982 DLIST_ADD(ctdb->db_list, ctdb_db);
984 /* setting this can help some high churn databases */
985 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
988 all databases support the "null" function. we need this in
989 order to do forced migration of records
991 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
993 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
994 talloc_free(ctdb_db);
999 all databases support the "fetch" function. we need this
1000 for efficient Samba3 ctdb fetch
1002 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1004 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1005 talloc_free(ctdb_db);
1010 all databases support the "fetch_with_header" function. we need this
1011 for efficient readonly record fetches
1013 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1015 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1016 talloc_free(ctdb_db);
1020 ret = ctdb_vacuum_init(ctdb_db);
1022 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1023 "database '%s'\n", ctdb_db->db_name));
1024 talloc_free(ctdb_db);
1028 ret = ctdb_migration_init(ctdb_db);
1031 ("Failed to setup migration tracking for db '%s'\n",
1033 talloc_free(ctdb_db);
1037 ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1038 &ctdb_db->lock_log);
1041 ("Failed to setup lock logging for db '%s'\n",
1043 talloc_free(ctdb_db);
1047 ctdb_db->generation = ctdb->vnn_map->generation;
1049 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1050 ctdb_db->db_path, tdb_flags));
1057 struct ctdb_deferred_attach_context {
1058 struct ctdb_deferred_attach_context *next, *prev;
1059 struct ctdb_context *ctdb;
1060 struct ctdb_req_control_old *c;
1064 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1066 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1071 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1072 struct tevent_timer *te,
1073 struct timeval t, void *private_data)
1075 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1076 struct ctdb_context *ctdb = da_ctx->ctdb;
1078 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1079 talloc_free(da_ctx);
1082 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1083 struct tevent_timer *te,
1084 struct timeval t, void *private_data)
1086 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1087 struct ctdb_context *ctdb = da_ctx->ctdb;
1089 /* This talloc-steals the packet ->c */
1090 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1091 talloc_free(da_ctx);
1094 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1096 struct ctdb_deferred_attach_context *da_ctx;
1098 /* call it from the main event loop as soon as the current event
1101 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1102 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1103 tevent_add_timer(ctdb->ev, da_ctx,
1104 timeval_current_ofs(1,0),
1105 ctdb_deferred_attach_callback, da_ctx);
1112 a client has asked to attach a new database
1114 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1115 TDB_DATA *outdata, uint64_t tdb_flags,
1116 bool persistent, uint32_t client_id,
1117 struct ctdb_req_control_old *c,
1120 const char *db_name = (const char *)indata.dptr;
1121 struct ctdb_db_context *db;
1122 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1123 struct ctdb_client *client = NULL;
1124 bool with_jenkinshash, with_mutexes;
1126 if (ctdb->tunable.allow_client_db_attach == 0) {
1127 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1128 "AllowClientDBAccess == 0\n", db_name));
1132 /* don't allow any local clients to attach while we are in recovery mode
1133 * except for the recovery daemon.
1134 * allow all attach from the network since these are always from remote
1137 if (client_id != 0) {
1138 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1140 if (client != NULL) {
1141 /* If the node is inactive it is not part of the cluster
1142 and we should not allow clients to attach to any
1145 if (node->flags & NODE_FLAGS_INACTIVE) {
1146 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1150 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1151 client->pid != ctdb->recoverd_pid &&
1152 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1153 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1155 if (da_ctx == NULL) {
1156 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1160 da_ctx->ctdb = ctdb;
1161 da_ctx->c = talloc_steal(da_ctx, c);
1162 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1163 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1165 tevent_add_timer(ctdb->ev, da_ctx,
1166 timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1167 ctdb_deferred_attach_timeout, da_ctx);
1169 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1170 *async_reply = true;
1175 /* the client can optionally pass additional tdb flags, but we
1176 only allow a subset of those on the database in ctdb. Note
1177 that tdb_flags is passed in via the (otherwise unused)
1178 srvid to the attach control */
1179 #ifdef TDB_MUTEX_LOCKING
1180 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1182 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1185 /* see if we already have this name */
1186 db = ctdb_db_handle(ctdb, db_name);
1188 if (db->persistent != persistent) {
1189 DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1190 "database %s\n", persistent ? "" : "non-",
1191 db-> persistent ? "" : "non-", db_name));
1194 outdata->dptr = (uint8_t *)&db->db_id;
1195 outdata->dsize = sizeof(db->db_id);
1199 with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1200 #ifdef TDB_MUTEX_LOCKING
1201 with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1203 with_mutexes = false;
1206 if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1207 with_jenkinshash, with_mutexes) != 0) {
1211 db = ctdb_db_handle(ctdb, db_name);
1213 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1217 /* remember the flags the client has specified */
1218 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1220 outdata->dptr = (uint8_t *)&db->db_id;
1221 outdata->dsize = sizeof(db->db_id);
1223 /* Try to ensure it's locked in mem */
1224 lockdown_memory(ctdb->valgrinding);
1226 /* tell all the other nodes about this database */
1227 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1228 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1229 CTDB_CONTROL_DB_ATTACH,
1230 0, CTDB_CTRL_FLAG_NOREPLY,
1231 indata, NULL, NULL);
1238 * a client has asked to detach from a database
1240 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1244 struct ctdb_db_context *ctdb_db;
1245 struct ctdb_client *client = NULL;
1247 db_id = *(uint32_t *)indata.dptr;
1248 ctdb_db = find_ctdb_db(ctdb, db_id);
1249 if (ctdb_db == NULL) {
1250 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1255 if (ctdb->tunable.allow_client_db_attach == 1) {
1256 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1257 "Clients are allowed access to databases "
1258 "(AllowClientDBAccess == 1)\n",
1263 if (ctdb_db->persistent) {
1264 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1265 "denied\n", ctdb_db->db_name));
1269 /* Cannot detach from database when in recovery */
1270 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1271 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1275 /* If a control comes from a client, then broadcast it to all nodes.
1276 * Do the actual detach only if the control comes from other daemons.
1278 if (client_id != 0) {
1279 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1280 if (client != NULL) {
1281 /* forward the control to all the nodes */
1282 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1283 CTDB_CONTROL_DB_DETACH, 0,
1284 CTDB_CTRL_FLAG_NOREPLY,
1285 indata, NULL, NULL);
1288 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1289 "for database '%s'\n", ctdb_db->db_name));
1293 /* Detach database from recoverd */
1294 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1295 CTDB_SRVID_DETACH_DATABASE,
1297 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1301 /* Disable vacuuming and drop all vacuuming data */
1302 talloc_free(ctdb_db->vacuum_handle);
1303 talloc_free(ctdb_db->delete_queue);
1305 /* Terminate any deferred fetch */
1306 talloc_free(ctdb_db->deferred_fetch);
1308 /* Terminate any traverses */
1309 while (ctdb_db->traverse) {
1310 talloc_free(ctdb_db->traverse);
1313 /* Terminate any revokes */
1314 while (ctdb_db->revokechild_active) {
1315 talloc_free(ctdb_db->revokechild_active);
1318 /* Free readonly tracking database */
1319 if (ctdb_db->readonly) {
1320 talloc_free(ctdb_db->rottdb);
1323 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1325 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1327 talloc_free(ctdb_db);
1333 attach to all existing persistent databases
1335 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1336 const char *unhealthy_reason)
1341 /* open the persistent db directory and scan it for files */
1342 d = opendir(ctdb->db_directory_persistent);
1347 while ((de=readdir(d))) {
1349 size_t len = strlen(de->d_name);
1351 int invalid_name = 0;
1353 s = talloc_strdup(ctdb, de->d_name);
1356 CTDB_NO_MEMORY(ctdb, s);
1359 /* only accept names ending in .tdb */
1360 p = strstr(s, ".tdb.");
1361 if (len < 7 || p == NULL) {
1366 /* only accept names ending with .tdb. and any number of digits */
1368 while (*q != 0 && invalid_name == 0) {
1369 if (!isdigit(*q++)) {
1373 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1374 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1380 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1381 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1387 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1395 int ctdb_attach_databases(struct ctdb_context *ctdb)
1398 char *persistent_health_path = NULL;
1399 char *unhealthy_reason = NULL;
1400 bool first_try = true;
1402 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1403 ctdb->db_directory_state,
1404 PERSISTENT_HEALTH_TDB,
1406 if (persistent_health_path == NULL) {
1407 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1413 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1414 0, TDB_DISALLOW_NESTING,
1415 O_CREAT | O_RDWR, 0600);
1416 if (ctdb->db_persistent_health == NULL) {
1417 struct tdb_wrap *tdb;
1420 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1421 persistent_health_path,
1424 talloc_free(persistent_health_path);
1425 talloc_free(unhealthy_reason);
1430 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1431 persistent_health_path,
1432 "was cleared after a failure",
1433 "manual verification needed");
1434 if (unhealthy_reason == NULL) {
1435 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1436 talloc_free(persistent_health_path);
1440 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1441 persistent_health_path));
1442 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1443 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1444 O_CREAT | O_RDWR, 0600);
1446 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1447 persistent_health_path,
1450 talloc_free(persistent_health_path);
1451 talloc_free(unhealthy_reason);
1458 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1460 struct tdb_wrap *tdb;
1462 talloc_free(ctdb->db_persistent_health);
1463 ctdb->db_persistent_health = NULL;
1466 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1467 persistent_health_path));
1468 talloc_free(persistent_health_path);
1469 talloc_free(unhealthy_reason);
1474 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1475 persistent_health_path,
1476 "was cleared after a failure",
1477 "manual verification needed");
1478 if (unhealthy_reason == NULL) {
1479 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1480 talloc_free(persistent_health_path);
1484 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1485 persistent_health_path));
1486 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1487 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1488 O_CREAT | O_RDWR, 0600);
1490 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1491 persistent_health_path,
1494 talloc_free(persistent_health_path);
1495 talloc_free(unhealthy_reason);
1502 talloc_free(persistent_health_path);
1504 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1505 talloc_free(unhealthy_reason);
1514 called when a broadcast seqnum update comes in
1516 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1518 struct ctdb_db_context *ctdb_db;
1519 if (srcnode == ctdb->pnn) {
1520 /* don't update ourselves! */
1524 ctdb_db = find_ctdb_db(ctdb, db_id);
1526 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1530 if (ctdb_db->unhealthy_reason) {
1531 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1532 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1536 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1537 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1542 timer to check for seqnum changes in a ltdb and propogate them
1544 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1545 struct tevent_timer *te,
1546 struct timeval t, void *p)
1548 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1549 struct ctdb_context *ctdb = ctdb_db->ctdb;
1550 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1551 if (new_seqnum != ctdb_db->seqnum) {
1552 /* something has changed - propogate it */
1554 data.dptr = (uint8_t *)&ctdb_db->db_id;
1555 data.dsize = sizeof(uint32_t);
1556 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1557 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1560 ctdb_db->seqnum = new_seqnum;
1562 /* setup a new timer */
1563 ctdb_db->seqnum_update =
1564 tevent_add_timer(ctdb->ev, ctdb_db,
1565 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1566 (ctdb->tunable.seqnum_interval%1000)*1000),
1567 ctdb_ltdb_seqnum_check, ctdb_db);
1571 enable seqnum handling on this db
1573 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1575 struct ctdb_db_context *ctdb_db;
1576 ctdb_db = find_ctdb_db(ctdb, db_id);
1578 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1582 if (ctdb_db->seqnum_update == NULL) {
1583 ctdb_db->seqnum_update = tevent_add_timer(
1585 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1586 (ctdb->tunable.seqnum_interval%1000)*1000),
1587 ctdb_ltdb_seqnum_check, ctdb_db);
1590 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1591 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1595 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1597 if (ctdb_db->sticky) {
1601 if (ctdb_db->persistent) {
1602 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1606 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1608 ctdb_db->sticky = true;
1610 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1615 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1617 struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1620 for (i=0; i<MAX_HOT_KEYS; i++) {
1621 if (s->hot_keys[i].key.dsize > 0) {
1622 talloc_free(s->hot_keys[i].key.dptr);
1626 ZERO_STRUCT(ctdb_db->statistics);
1629 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1633 struct ctdb_db_context *ctdb_db;
1634 struct ctdb_db_statistics_old *stats;
1639 ctdb_db = find_ctdb_db(ctdb, db_id);
1641 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1645 len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1646 for (i = 0; i < MAX_HOT_KEYS; i++) {
1647 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1650 stats = talloc_size(outdata, len);
1651 if (stats == NULL) {
1652 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1656 memcpy(stats, &ctdb_db->statistics,
1657 offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1659 stats->num_hot_keys = MAX_HOT_KEYS;
1661 ptr = &stats->hot_keys_wire[0];
1662 for (i = 0; i < MAX_HOT_KEYS; i++) {
1663 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1664 ctdb_db->statistics.hot_keys[i].key.dsize);
1665 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1668 outdata->dptr = (uint8_t *)stats;
1669 outdata->dsize = len;