2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #include "server/ctdb_config.h"
46 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
49 * write a record to a normal database
51 * This is the server-variant of the ctdb_ltdb_store function.
52 * It contains logic to determine whether a record should be
53 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
54 * controls to the local ctdb daemon if apporpriate.
56 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
58 struct ctdb_ltdb_header *header,
61 struct ctdb_context *ctdb = ctdb_db->ctdb;
63 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
66 bool schedule_for_deletion = false;
67 bool remove_from_delete_queue = false;
70 if (ctdb->flags & CTDB_FLAG_TORTURE) {
72 struct ctdb_ltdb_header *h2;
74 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
75 h2 = (struct ctdb_ltdb_header *)old.dptr;
76 if (old.dptr != NULL &&
78 h2->rsn > header->rsn) {
80 ("RSN regression! %"PRIu64" %"PRIu64"\n",
81 h2->rsn, header->rsn));
88 if (ctdb->vnn_map == NULL) {
90 * Called from a client: always store the record
91 * Also don't call ctdb_lmaster since it uses the vnn_map!
97 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
100 * If we migrate an empty record off to another node
101 * and the record has not been migrated with data,
102 * delete the record instead of storing the empty record.
104 if (data.dsize != 0) {
106 } else if (header->flags & CTDB_REC_RO_FLAGS) {
108 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
110 * The record is not created by the client but
111 * automatically by the ctdb_ltdb_fetch logic that
112 * creates a record with an initial header in the
113 * ltdb before trying to migrate the record from
114 * the current lmaster. Keep it instead of trying
115 * to delete the non-existing record...
118 schedule_for_deletion = true;
119 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
121 } else if (ctdb_db->ctdb->pnn == lmaster) {
123 * If we are lmaster, then we usually keep the record.
124 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
125 * and the record is empty and has never been migrated
126 * with data, then we should delete it instead of storing it.
127 * This is part of the vacuuming process.
129 * The reason that we usually need to store even empty records
130 * on the lmaster is that a client operating directly on the
131 * lmaster (== dmaster) expects the local copy of the record to
132 * exist after successful ctdb migrate call. If the record does
133 * not exist, the client goes into a migrate loop and eventually
134 * fails. So storing the empty record makes sure that we do not
135 * need to change the client code.
137 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
139 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
142 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
147 if (ctdb_db_volatile(ctdb_db) &&
148 (ctdb_db->ctdb->pnn == header->dmaster) &&
149 !(header->flags & CTDB_REC_RO_FLAGS))
153 if (data.dsize == 0) {
154 schedule_for_deletion = true;
157 remove_from_delete_queue = !schedule_for_deletion;
162 * The VACUUM_MIGRATED flag is only set temporarily for
163 * the above logic when the record was retrieved by a
164 * VACUUM_MIGRATE call and should not be stored in the
167 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
168 * and there are two cases in which the corresponding record
169 * is stored in the local database:
170 * 1. The record has been migrated with data in the past
171 * (the MIGRATED_WITH_DATA record flag is set).
172 * 2. The record has been filled with data again since it
173 * had been submitted in the VACUUM_FETCH message to the
175 * For such records it is important to not store the
176 * VACUUM_MIGRATED flag in the database.
178 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
181 * Similarly, clear the AUTOMATIC flag which should not enter
182 * the local database copy since this would require client
183 * modifications to clear the flag when the client stores
186 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
188 rec[0].dsize = hsize;
189 rec[0].dptr = (uint8_t *)header;
191 rec[1].dsize = data.dsize;
192 rec[1].dptr = data.dptr;
194 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
196 keep?"storing":"deleting",
200 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
202 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
209 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
214 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
217 keep?"store":"delete", ret,
218 tdb_errorstr(ctdb_db->ltdb->tdb)));
220 schedule_for_deletion = false;
221 remove_from_delete_queue = false;
224 if (schedule_for_deletion) {
226 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
228 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
232 if (remove_from_delete_queue) {
233 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
239 struct lock_fetch_state {
240 struct ctdb_context *ctdb;
241 struct ctdb_db_context *ctdb_db;
242 void (*recv_pkt)(void *, struct ctdb_req_header *);
244 struct ctdb_req_header *hdr;
246 bool ignore_generation;
250 called when we should retry the operation
252 static void lock_fetch_callback(void *p, bool locked)
254 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
255 if (!state->ignore_generation &&
256 state->generation != state->ctdb_db->generation) {
257 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
258 talloc_free(state->hdr);
261 state->recv_pkt(state->recv_context, state->hdr);
262 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
267 do a non-blocking ltdb_lock, deferring this ctdb request until we
270 It does the following:
272 1) tries to get the chainlock. If it succeeds, then it returns 0
274 2) if it fails to get a chainlock immediately then it sets up a
275 non-blocking chainlock via ctdb_lock_record, and when it gets the
276 chainlock it re-submits this ctdb request to the main packet
279 This effectively queues all ctdb requests that cannot be
280 immediately satisfied until it can get the lock. This means that
281 the main ctdb daemon will not block waiting for a chainlock held by
284 There are 3 possible return values:
286 0: means that it got the lock immediately.
287 -1: means that it failed to get the lock, and won't retry
288 -2: means that it failed to get the lock immediately, but will retry
290 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
291 TDB_DATA key, struct ctdb_req_header *hdr,
292 void (*recv_pkt)(void *, struct ctdb_req_header *),
293 void *recv_context, bool ignore_generation)
296 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
297 struct lock_request *lreq;
298 struct lock_fetch_state *state;
300 ret = tdb_chainlock_nonblock(tdb, key);
303 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
304 /* a hard failure - don't try again */
308 /* when torturing, ensure we test the contended path */
309 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
312 tdb_chainunlock(tdb, key);
315 /* first the non-contended path */
320 state = talloc(hdr, struct lock_fetch_state);
321 state->ctdb = ctdb_db->ctdb;
322 state->ctdb_db = ctdb_db;
324 state->recv_pkt = recv_pkt;
325 state->recv_context = recv_context;
326 state->generation = ctdb_db->generation;
327 state->ignore_generation = ignore_generation;
329 /* now the contended path */
330 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
335 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
336 so it won't be freed yet */
337 talloc_steal(state, hdr);
339 /* now tell the caller than we will retry asynchronously */
344 a varient of ctdb_ltdb_lock_requeue that also fetches the record
346 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
347 TDB_DATA key, struct ctdb_ltdb_header *header,
348 struct ctdb_req_header *hdr, TDB_DATA *data,
349 void (*recv_pkt)(void *, struct ctdb_req_header *),
350 void *recv_context, bool ignore_generation)
354 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
355 recv_context, ignore_generation);
360 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
363 uret = ctdb_ltdb_unlock(ctdb_db, key);
365 DBG_ERR("ctdb_ltdb_unlock() failed with error %d\n",
374 paranoid check to see if the db is empty
376 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
378 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
379 int count = tdb_traverse_read(tdb, NULL, NULL);
381 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
383 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
387 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
388 struct ctdb_db_context *ctdb_db)
390 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
396 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
397 key.dsize = strlen(ctdb_db->db_name);
399 old = ctdb_db->unhealthy_reason;
400 ctdb_db->unhealthy_reason = NULL;
402 val = tdb_fetch(tdb, key);
404 reason = talloc_strndup(ctdb_db,
405 (const char *)val.dptr,
407 if (reason == NULL) {
408 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
410 ctdb_db->unhealthy_reason = old;
421 ctdb_db->unhealthy_reason = reason;
425 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
426 struct ctdb_db_context *ctdb_db,
427 const char *given_reason,/* NULL means healthy */
428 int num_healthy_nodes)
430 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
434 char *new_reason = NULL;
435 char *old_reason = NULL;
437 ret = tdb_transaction_start(tdb);
439 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
440 tdb_name(tdb), ret, tdb_errorstr(tdb)));
444 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
446 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
447 ctdb_db->db_name, ret));
450 old_reason = ctdb_db->unhealthy_reason;
452 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
453 key.dsize = strlen(ctdb_db->db_name);
456 new_reason = talloc_strdup(ctdb_db, given_reason);
457 if (new_reason == NULL) {
458 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
462 } else if (old_reason && num_healthy_nodes == 0) {
464 * If the reason indicates ok, but there where no healthy nodes
465 * available, that it means, we have not recovered valid content
466 * of the db. So if there's an old reason, prefix it with
467 * "NO-HEALTHY-NODES - "
471 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
472 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
474 prefix = _TMP_PREFIX;
478 new_reason = talloc_asprintf(ctdb_db, "%s%s",
480 if (new_reason == NULL) {
481 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
482 prefix, old_reason));
489 val.dptr = discard_const_p(uint8_t, new_reason);
490 val.dsize = strlen(new_reason);
492 ret = tdb_store(tdb, key, val, TDB_REPLACE);
494 tdb_transaction_cancel(tdb);
495 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
496 tdb_name(tdb), ctdb_db->db_name, new_reason,
497 ret, tdb_errorstr(tdb)));
498 talloc_free(new_reason);
501 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
502 ctdb_db->db_name, new_reason));
503 } else if (old_reason) {
504 ret = tdb_delete(tdb, key);
506 tdb_transaction_cancel(tdb);
507 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
508 tdb_name(tdb), ctdb_db->db_name,
509 ret, tdb_errorstr(tdb)));
510 talloc_free(new_reason);
513 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
517 ret = tdb_transaction_commit(tdb);
518 if (ret != TDB_SUCCESS) {
519 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
520 tdb_name(tdb), ret, tdb_errorstr(tdb)));
521 talloc_free(new_reason);
525 talloc_free(old_reason);
526 ctdb_db->unhealthy_reason = new_reason;
531 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
532 struct ctdb_db_context *ctdb_db)
534 time_t now = time(NULL);
542 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
543 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
544 "%04u%02u%02u%02u%02u%02u.0Z",
546 tm->tm_year+1900, tm->tm_mon+1,
547 tm->tm_mday, tm->tm_hour, tm->tm_min,
549 if (new_path == NULL) {
550 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
554 new_reason = talloc_asprintf(ctdb_db,
555 "ERROR - Backup of corrupted TDB in '%s'",
557 if (new_reason == NULL) {
558 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
561 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
562 talloc_free(new_reason);
564 DEBUG(DEBUG_CRIT,(__location__
565 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
570 ret = rename(ctdb_db->db_path, new_path);
572 DEBUG(DEBUG_CRIT,(__location__
573 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
574 ctdb_db->db_path, new_path,
575 errno, strerror(errno)));
576 talloc_free(new_path);
580 DEBUG(DEBUG_CRIT,(__location__
581 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
582 ctdb_db->db_path, new_path));
583 talloc_free(new_path);
587 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
589 struct ctdb_db_context *ctdb_db;
594 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
595 if (!ctdb_db_persistent(ctdb_db)) {
599 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
601 DEBUG(DEBUG_ALERT,(__location__
602 " load persistent health for '%s' failed\n",
607 if (ctdb_db->unhealthy_reason == NULL) {
609 DEBUG(DEBUG_INFO,(__location__
610 " persistent db '%s' healthy\n",
616 DEBUG(DEBUG_ALERT,(__location__
617 " persistent db '%s' unhealthy: %s\n",
619 ctdb_db->unhealthy_reason));
622 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
634 mark a database - as healthy
636 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
638 uint32_t db_id = *(uint32_t *)indata.dptr;
639 struct ctdb_db_context *ctdb_db;
641 bool may_recover = false;
643 ctdb_db = find_ctdb_db(ctdb, db_id);
645 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
649 if (ctdb_db->unhealthy_reason) {
653 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
655 DEBUG(DEBUG_ERR,(__location__
656 " ctdb_update_persistent_health(%s) failed\n",
661 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
662 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
664 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
670 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
674 uint32_t db_id = *(uint32_t *)indata.dptr;
675 struct ctdb_db_context *ctdb_db;
678 ctdb_db = find_ctdb_db(ctdb, db_id);
680 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
684 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
686 DEBUG(DEBUG_ERR,(__location__
687 " ctdb_load_persistent_health(%s) failed\n",
693 if (ctdb_db->unhealthy_reason) {
694 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
695 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
702 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
706 if (ctdb_db_readonly(ctdb_db)) {
710 if (! ctdb_db_volatile(ctdb_db)) {
712 ("Non-volatile databases do not support readonly flag\n"));
716 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
717 if (ropath == NULL) {
718 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
721 ctdb_db->rottdb = tdb_open(ropath,
722 ctdb->tunable.database_hash_size,
723 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
724 O_CREAT|O_RDWR, 0600);
725 if (ctdb_db->rottdb == NULL) {
726 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
731 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
733 ctdb_db_set_readonly(ctdb_db);
735 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
742 attach to a database, handling both persistent and non-persistent databases
743 return 0 on success, -1 on failure
745 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
746 uint8_t db_flags, const char *unhealthy_reason)
748 struct ctdb_db_context *ctdb_db, *tmp_db;
753 int remaining_tries = 0;
755 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
756 CTDB_NO_MEMORY(ctdb, ctdb_db);
758 ctdb_db->ctdb = ctdb;
759 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
760 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
762 key.dsize = strlen(db_name)+1;
763 key.dptr = discard_const(db_name);
764 ctdb_db->db_id = ctdb_hash(&key);
765 ctdb_db->db_flags = db_flags;
767 if (ctdb_db_volatile(ctdb_db)) {
768 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
769 if (ctdb_db->delete_queue == NULL) {
770 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
773 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
776 /* check for hash collisions */
777 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
778 if (tmp_db->db_id == ctdb_db->db_id) {
779 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
780 tmp_db->db_id, db_name, tmp_db->db_name));
781 talloc_free(ctdb_db);
786 if (ctdb_db_persistent(ctdb_db)) {
787 if (unhealthy_reason) {
788 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
789 unhealthy_reason, 0);
791 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
792 ctdb_db->db_name, unhealthy_reason, ret));
793 talloc_free(ctdb_db);
798 if (ctdb->max_persistent_check_errors > 0) {
801 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
805 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
807 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
808 ctdb_db->db_name, ret));
809 talloc_free(ctdb_db);
814 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
815 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
816 ctdb_db->db_name, ctdb_db->unhealthy_reason));
817 talloc_free(ctdb_db);
821 if (ctdb_db->unhealthy_reason) {
822 /* this is just a warning, but we want that in the log file! */
823 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
824 ctdb_db->db_name, ctdb_db->unhealthy_reason));
827 /* open the database */
828 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
829 ctdb_db_persistent(ctdb_db) ?
830 ctdb->db_directory_persistent :
834 tdb_flags = ctdb_db_tdb_flags(db_flags,
836 ctdb_config.tdb_mutexes);
839 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
840 ctdb->tunable.database_hash_size,
842 O_CREAT|O_RDWR, mode);
843 if (ctdb_db->ltdb == NULL) {
845 int saved_errno = errno;
847 if (! ctdb_db_persistent(ctdb_db)) {
848 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
851 strerror(saved_errno)));
852 talloc_free(ctdb_db);
856 if (remaining_tries == 0) {
857 DEBUG(DEBUG_CRIT,(__location__
858 "Failed to open persistent tdb '%s': %d - %s\n",
861 strerror(saved_errno)));
862 talloc_free(ctdb_db);
866 ret = stat(ctdb_db->db_path, &st);
868 DEBUG(DEBUG_CRIT,(__location__
869 "Failed to open persistent tdb '%s': %d - %s\n",
872 strerror(saved_errno)));
873 talloc_free(ctdb_db);
877 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
879 DEBUG(DEBUG_CRIT,(__location__
880 "Failed to open persistent tdb '%s': %d - %s\n",
883 strerror(saved_errno)));
884 talloc_free(ctdb_db);
893 if (!ctdb_db_persistent(ctdb_db)) {
894 ctdb_check_db_empty(ctdb_db);
896 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
901 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
902 ctdb_db->db_path, ret,
903 tdb_errorstr(ctdb_db->ltdb->tdb)));
904 if (remaining_tries == 0) {
905 talloc_free(ctdb_db);
909 fd = tdb_fd(ctdb_db->ltdb->tdb);
910 ret = fstat(fd, &st);
912 DEBUG(DEBUG_CRIT,(__location__
913 "Failed to fstat() persistent tdb '%s': %d - %s\n",
917 talloc_free(ctdb_db);
922 talloc_free(ctdb_db->ltdb);
923 ctdb_db->ltdb = NULL;
925 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
927 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
929 talloc_free(ctdb_db);
939 /* remember the flags the client has specified */
940 tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
943 /* set up a rb tree we can use to track which records we have a
944 fetch-lock in-flight for so we can defer any additional calls
947 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
948 if (ctdb_db->deferred_fetch == NULL) {
949 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
950 talloc_free(ctdb_db);
954 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
955 if (ctdb_db->defer_dmaster == NULL) {
956 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
958 talloc_free(ctdb_db);
962 DLIST_ADD(ctdb->db_list, ctdb_db);
964 /* setting this can help some high churn databases */
965 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
968 all databases support the "null" function. we need this in
969 order to do forced migration of records
971 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
973 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
974 talloc_free(ctdb_db);
979 all databases support the "fetch" function. we need this
980 for efficient Samba3 ctdb fetch
982 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
984 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
985 talloc_free(ctdb_db);
990 all databases support the "fetch_with_header" function. we need this
991 for efficient readonly record fetches
993 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
995 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
996 talloc_free(ctdb_db);
1000 ret = ctdb_vacuum_init(ctdb_db);
1002 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1003 "database '%s'\n", ctdb_db->db_name));
1004 talloc_free(ctdb_db);
1008 ret = ctdb_migration_init(ctdb_db);
1011 ("Failed to setup migration tracking for db '%s'\n",
1013 talloc_free(ctdb_db);
1017 ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1018 &ctdb_db->lock_log);
1021 ("Failed to setup lock logging for db '%s'\n",
1023 talloc_free(ctdb_db);
1027 ctdb_db->generation = ctdb->vnn_map->generation;
1029 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1030 ctdb_db->db_path, tdb_flags));
1037 struct ctdb_deferred_attach_context {
1038 struct ctdb_deferred_attach_context *next, *prev;
1039 struct ctdb_context *ctdb;
1040 struct ctdb_req_control_old *c;
1044 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1046 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1051 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1052 struct tevent_timer *te,
1053 struct timeval t, void *private_data)
1055 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1056 struct ctdb_context *ctdb = da_ctx->ctdb;
1058 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1059 talloc_free(da_ctx);
1062 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1063 struct tevent_timer *te,
1064 struct timeval t, void *private_data)
1066 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1067 struct ctdb_context *ctdb = da_ctx->ctdb;
1069 /* This talloc-steals the packet ->c */
1070 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1071 talloc_free(da_ctx);
1074 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1076 struct ctdb_deferred_attach_context *da_ctx;
1078 /* call it from the main event loop as soon as the current event
1081 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1082 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1083 tevent_add_timer(ctdb->ev, da_ctx,
1084 timeval_current_ofs(1,0),
1085 ctdb_deferred_attach_callback, da_ctx);
1092 a client has asked to attach a new database
1094 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb,
1100 struct ctdb_req_control_old *c,
1103 const char *db_name = (const char *)indata.dptr;
1104 struct ctdb_db_context *db;
1105 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1106 struct ctdb_client *client = NULL;
1109 if (ctdb->tunable.allow_client_db_attach == 0) {
1110 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1111 "AllowClientDBAccess == 0\n", db_name));
1115 /* don't allow any local clients to attach while we are in recovery mode
1116 * except for the recovery daemon.
1117 * allow all attach from the network since these are always from remote
1120 if (srcnode == ctdb->pnn && client_id != 0) {
1121 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1123 if (client != NULL) {
1124 /* If the node is inactive it is not part of the cluster
1125 and we should not allow clients to attach to any
1128 if (node->flags & NODE_FLAGS_INACTIVE) {
1129 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1133 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1134 client->pid != ctdb->recoverd_pid &&
1135 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1136 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1138 if (da_ctx == NULL) {
1139 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1143 da_ctx->ctdb = ctdb;
1144 da_ctx->c = talloc_steal(da_ctx, c);
1145 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1146 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1148 tevent_add_timer(ctdb->ev, da_ctx,
1149 timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1150 ctdb_deferred_attach_timeout, da_ctx);
1152 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1153 *async_reply = true;
1158 /* see if we already have this name */
1159 db = ctdb_db_handle(ctdb, db_name);
1161 if ((db->db_flags & db_flags) != db_flags) {
1163 ("Error: Failed to re-attach with 0x%x flags,"
1164 " database has 0x%x flags\n", db_flags,
1168 outdata->dptr = (uint8_t *)&db->db_id;
1169 outdata->dsize = sizeof(db->db_id);
1173 if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
1177 db = ctdb_db_handle(ctdb, db_name);
1179 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1183 outdata->dptr = (uint8_t *)&db->db_id;
1184 outdata->dsize = sizeof(db->db_id);
1186 /* Try to ensure it's locked in mem */
1187 lockdown_memory(ctdb->valgrinding);
1189 if (ctdb_db_persistent(db)) {
1190 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1191 } else if (ctdb_db_replicated(db)) {
1192 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1194 opcode = CTDB_CONTROL_DB_ATTACH;
1197 /* tell all the other nodes about this database */
1198 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
1199 0, CTDB_CTRL_FLAG_NOREPLY,
1200 indata, NULL, NULL);
1207 * a client has asked to detach from a database
1209 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1213 struct ctdb_db_context *ctdb_db;
1214 struct ctdb_client *client = NULL;
1216 db_id = *(uint32_t *)indata.dptr;
1217 ctdb_db = find_ctdb_db(ctdb, db_id);
1218 if (ctdb_db == NULL) {
1219 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1224 if (ctdb->tunable.allow_client_db_attach == 1) {
1225 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1226 "Clients are allowed access to databases "
1227 "(AllowClientDBAccess == 1)\n",
1232 if (! ctdb_db_volatile(ctdb_db)) {
1234 ("Detaching non-volatile database %s denied\n",
1239 /* Cannot detach from database when in recovery */
1240 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1241 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1245 /* If a control comes from a client, then broadcast it to all nodes.
1246 * Do the actual detach only if the control comes from other daemons.
1248 if (client_id != 0) {
1249 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1250 if (client != NULL) {
1251 /* forward the control to all the nodes */
1252 ctdb_daemon_send_control(ctdb,
1253 CTDB_BROADCAST_CONNECTED, 0,
1254 CTDB_CONTROL_DB_DETACH, 0,
1255 CTDB_CTRL_FLAG_NOREPLY,
1256 indata, NULL, NULL);
1259 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1260 "for database '%s'\n", ctdb_db->db_name));
1264 /* Detach database from recoverd */
1265 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1266 CTDB_SRVID_DETACH_DATABASE,
1268 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1272 /* Disable vacuuming and drop all vacuuming data */
1273 talloc_free(ctdb_db->vacuum_handle);
1274 talloc_free(ctdb_db->delete_queue);
1276 /* Terminate any deferred fetch */
1277 talloc_free(ctdb_db->deferred_fetch);
1279 /* Terminate any traverses */
1280 while (ctdb_db->traverse) {
1281 talloc_free(ctdb_db->traverse);
1284 /* Terminate any revokes */
1285 while (ctdb_db->revokechild_active) {
1286 talloc_free(ctdb_db->revokechild_active);
1289 /* Free readonly tracking database */
1290 if (ctdb_db_readonly(ctdb_db)) {
1291 talloc_free(ctdb_db->rottdb);
1294 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1296 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1298 talloc_free(ctdb_db);
1304 attach to all existing persistent databases
1306 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1307 const char *unhealthy_reason)
1312 /* open the persistent db directory and scan it for files */
1313 d = opendir(ctdb->db_directory_persistent);
1318 while ((de=readdir(d))) {
1320 size_t len = strlen(de->d_name);
1322 int invalid_name = 0;
1324 s = talloc_strdup(ctdb, de->d_name);
1327 CTDB_NO_MEMORY(ctdb, s);
1330 /* only accept names ending in .tdb */
1331 p = strstr(s, ".tdb.");
1332 if (len < 7 || p == NULL) {
1337 /* only accept names ending with .tdb. and any number of digits */
1339 while (*q != 0 && invalid_name == 0) {
1340 if (!isdigit(*q++)) {
1344 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1345 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1351 if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
1352 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1358 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1366 int ctdb_attach_databases(struct ctdb_context *ctdb)
1369 char *persistent_health_path = NULL;
1370 char *unhealthy_reason = NULL;
1371 bool first_try = true;
1373 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1374 ctdb->db_directory_state,
1375 PERSISTENT_HEALTH_TDB,
1377 if (persistent_health_path == NULL) {
1378 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1384 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1385 0, TDB_DISALLOW_NESTING,
1386 O_CREAT | O_RDWR, 0600);
1387 if (ctdb->db_persistent_health == NULL) {
1388 struct tdb_wrap *tdb;
1391 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1392 persistent_health_path,
1395 talloc_free(persistent_health_path);
1396 talloc_free(unhealthy_reason);
1401 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1402 persistent_health_path,
1403 "was cleared after a failure",
1404 "manual verification needed");
1405 if (unhealthy_reason == NULL) {
1406 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1407 talloc_free(persistent_health_path);
1411 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1412 persistent_health_path));
1413 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1414 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1415 O_CREAT | O_RDWR, 0600);
1417 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1418 persistent_health_path,
1421 talloc_free(persistent_health_path);
1422 talloc_free(unhealthy_reason);
1429 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1431 struct tdb_wrap *tdb;
1433 talloc_free(ctdb->db_persistent_health);
1434 ctdb->db_persistent_health = NULL;
1437 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1438 persistent_health_path));
1439 talloc_free(persistent_health_path);
1440 talloc_free(unhealthy_reason);
1445 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1446 persistent_health_path,
1447 "was cleared after a failure",
1448 "manual verification needed");
1449 if (unhealthy_reason == NULL) {
1450 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1451 talloc_free(persistent_health_path);
1455 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1456 persistent_health_path));
1457 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1458 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1459 O_CREAT | O_RDWR, 0600);
1461 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1462 persistent_health_path,
1465 talloc_free(persistent_health_path);
1466 talloc_free(unhealthy_reason);
1473 talloc_free(persistent_health_path);
1475 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1476 talloc_free(unhealthy_reason);
1485 called when a broadcast seqnum update comes in
1487 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1489 struct ctdb_db_context *ctdb_db;
1490 if (srcnode == ctdb->pnn) {
1491 /* don't update ourselves! */
1495 ctdb_db = find_ctdb_db(ctdb, db_id);
1497 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1501 if (ctdb_db->unhealthy_reason) {
1502 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1503 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1507 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1508 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1513 timer to check for seqnum changes in a ltdb and propagate them
1515 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1516 struct tevent_timer *te,
1517 struct timeval t, void *p)
1519 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1520 struct ctdb_context *ctdb = ctdb_db->ctdb;
1521 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1522 if (new_seqnum != ctdb_db->seqnum) {
1523 /* something has changed - propagate it */
1525 data.dptr = (uint8_t *)&ctdb_db->db_id;
1526 data.dsize = sizeof(uint32_t);
1527 ctdb_daemon_send_control(ctdb,
1528 CTDB_BROADCAST_ACTIVE,
1530 CTDB_CONTROL_UPDATE_SEQNUM,
1532 CTDB_CTRL_FLAG_NOREPLY,
1537 ctdb_db->seqnum = new_seqnum;
1539 /* setup a new timer */
1540 ctdb_db->seqnum_update =
1541 tevent_add_timer(ctdb->ev, ctdb_db,
1542 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1543 (ctdb->tunable.seqnum_interval%1000)*1000),
1544 ctdb_ltdb_seqnum_check, ctdb_db);
1548 enable seqnum handling on this db
1550 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1552 struct ctdb_db_context *ctdb_db;
1553 ctdb_db = find_ctdb_db(ctdb, db_id);
1555 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1559 if (ctdb_db->seqnum_update == NULL) {
1560 ctdb_db->seqnum_update = tevent_add_timer(
1562 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1563 (ctdb->tunable.seqnum_interval%1000)*1000),
1564 ctdb_ltdb_seqnum_check, ctdb_db);
1567 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1568 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1572 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1574 if (ctdb_db_sticky(ctdb_db)) {
1578 if (! ctdb_db_volatile(ctdb_db)) {
1580 ("Non-volatile databases do not support sticky flag\n"));
1584 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1586 ctdb_db_set_sticky(ctdb_db);
1588 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1593 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1595 struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1598 for (i=0; i<MAX_HOT_KEYS; i++) {
1599 if (s->hot_keys[i].key.dsize > 0) {
1600 talloc_free(s->hot_keys[i].key.dptr);
1604 ZERO_STRUCT(ctdb_db->statistics);
1607 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1611 struct ctdb_db_context *ctdb_db;
1612 struct ctdb_db_statistics_old *stats;
1617 ctdb_db = find_ctdb_db(ctdb, db_id);
1619 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1623 len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1624 for (i = 0; i < MAX_HOT_KEYS; i++) {
1625 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1628 stats = talloc_size(outdata, len);
1629 if (stats == NULL) {
1630 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1634 memcpy(stats, &ctdb_db->statistics,
1635 offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1637 stats->num_hot_keys = MAX_HOT_KEYS;
1639 ptr = &stats->hot_keys_wire[0];
1640 for (i = 0; i < MAX_HOT_KEYS; i++) {
1641 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1642 ctdb_db->statistics.hot_keys[i].key.dsize);
1643 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1646 outdata->dptr = (uint8_t *)stats;
1647 outdata->dsize = len;