2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #include "server/ctdb_config.h"
46 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
49 * write a record to a normal database
51 * This is the server-variant of the ctdb_ltdb_store function.
52 * It contains logic to determine whether a record should be
53 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
54 * controls to the local ctdb daemon if apporpriate.
56 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
58 struct ctdb_ltdb_header *header,
61 struct ctdb_context *ctdb = ctdb_db->ctdb;
63 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
65 bool seqnum_suppressed = false;
67 bool schedule_for_deletion = false;
68 bool remove_from_delete_queue = false;
71 if (ctdb->flags & CTDB_FLAG_TORTURE) {
73 struct ctdb_ltdb_header *h2;
75 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
76 h2 = (struct ctdb_ltdb_header *)old.dptr;
77 if (old.dptr != NULL &&
79 h2->rsn > header->rsn) {
81 ("RSN regression! %"PRIu64" %"PRIu64"\n",
82 h2->rsn, header->rsn));
89 if (ctdb->vnn_map == NULL) {
91 * Called from a client: always store the record
92 * Also don't call ctdb_lmaster since it uses the vnn_map!
98 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
101 * If we migrate an empty record off to another node
102 * and the record has not been migrated with data,
103 * delete the record instead of storing the empty record.
105 if (data.dsize != 0) {
107 } else if (header->flags & CTDB_REC_RO_FLAGS) {
109 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
111 * The record is not created by the client but
112 * automatically by the ctdb_ltdb_fetch logic that
113 * creates a record with an initial header in the
114 * ltdb before trying to migrate the record from
115 * the current lmaster. Keep it instead of trying
116 * to delete the non-existing record...
119 schedule_for_deletion = true;
120 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
122 } else if (ctdb_db->ctdb->pnn == lmaster) {
124 * If we are lmaster, then we usually keep the record.
125 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
126 * and the record is empty and has never been migrated
127 * with data, then we should delete it instead of storing it.
128 * This is part of the vacuuming process.
130 * The reason that we usually need to store even empty records
131 * on the lmaster is that a client operating directly on the
132 * lmaster (== dmaster) expects the local copy of the record to
133 * exist after successful ctdb migrate call. If the record does
134 * not exist, the client goes into a migrate loop and eventually
135 * fails. So storing the empty record makes sure that we do not
136 * need to change the client code.
138 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
140 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
143 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
148 if (ctdb_db_volatile(ctdb_db) &&
149 (ctdb_db->ctdb->pnn == header->dmaster) &&
150 !(header->flags & CTDB_REC_RO_FLAGS))
154 if (data.dsize == 0) {
155 schedule_for_deletion = true;
158 remove_from_delete_queue = !schedule_for_deletion;
163 * The VACUUM_MIGRATED flag is only set temporarily for
164 * the above logic when the record was retrieved by a
165 * VACUUM_MIGRATE call and should not be stored in the
168 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
169 * and there are two cases in which the corresponding record
170 * is stored in the local database:
171 * 1. The record has been migrated with data in the past
172 * (the MIGRATED_WITH_DATA record flag is set).
173 * 2. The record has been filled with data again since it
174 * had been submitted in the VACUUM_FETCH message to the
176 * For such records it is important to not store the
177 * VACUUM_MIGRATED flag in the database.
179 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
182 * Similarly, clear the AUTOMATIC flag which should not enter
183 * the local database copy since this would require client
184 * modifications to clear the flag when the client stores
187 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
189 rec[0].dsize = hsize;
190 rec[0].dptr = (uint8_t *)header;
192 rec[1].dsize = data.dsize;
193 rec[1].dptr = data.dptr;
195 /* Databases with seqnum updates enabled only get their seqnum
196 changes when/if we modify the data */
197 if (ctdb_db->seqnum_update != NULL) {
199 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
201 if ((old.dsize == hsize + data.dsize) &&
202 memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
203 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
204 seqnum_suppressed = true;
206 if (old.dptr != NULL) {
211 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
213 keep?"storing":"deleting",
217 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
219 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
226 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
231 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
234 keep?"store":"delete", ret,
235 tdb_errorstr(ctdb_db->ltdb->tdb)));
237 schedule_for_deletion = false;
238 remove_from_delete_queue = false;
240 if (seqnum_suppressed) {
241 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
244 if (schedule_for_deletion) {
246 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
248 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
252 if (remove_from_delete_queue) {
253 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
259 struct lock_fetch_state {
260 struct ctdb_context *ctdb;
261 struct ctdb_db_context *ctdb_db;
262 void (*recv_pkt)(void *, struct ctdb_req_header *);
264 struct ctdb_req_header *hdr;
266 bool ignore_generation;
270 called when we should retry the operation
272 static void lock_fetch_callback(void *p, bool locked)
274 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
275 if (!state->ignore_generation &&
276 state->generation != state->ctdb_db->generation) {
277 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
278 talloc_free(state->hdr);
281 state->recv_pkt(state->recv_context, state->hdr);
282 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
287 do a non-blocking ltdb_lock, deferring this ctdb request until we
290 It does the following:
292 1) tries to get the chainlock. If it succeeds, then it returns 0
294 2) if it fails to get a chainlock immediately then it sets up a
295 non-blocking chainlock via ctdb_lock_record, and when it gets the
296 chainlock it re-submits this ctdb request to the main packet
299 This effectively queues all ctdb requests that cannot be
300 immediately satisfied until it can get the lock. This means that
301 the main ctdb daemon will not block waiting for a chainlock held by
304 There are 3 possible return values:
306 0: means that it got the lock immediately.
307 -1: means that it failed to get the lock, and won't retry
308 -2: means that it failed to get the lock immediately, but will retry
310 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
311 TDB_DATA key, struct ctdb_req_header *hdr,
312 void (*recv_pkt)(void *, struct ctdb_req_header *),
313 void *recv_context, bool ignore_generation)
316 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
317 struct lock_request *lreq;
318 struct lock_fetch_state *state;
320 ret = tdb_chainlock_nonblock(tdb, key);
323 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
324 /* a hard failure - don't try again */
328 /* when torturing, ensure we test the contended path */
329 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
332 tdb_chainunlock(tdb, key);
335 /* first the non-contended path */
340 state = talloc(hdr, struct lock_fetch_state);
341 state->ctdb = ctdb_db->ctdb;
342 state->ctdb_db = ctdb_db;
344 state->recv_pkt = recv_pkt;
345 state->recv_context = recv_context;
346 state->generation = ctdb_db->generation;
347 state->ignore_generation = ignore_generation;
349 /* now the contended path */
350 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
355 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
356 so it won't be freed yet */
357 talloc_steal(state, hdr);
359 /* now tell the caller than we will retry asynchronously */
364 a varient of ctdb_ltdb_lock_requeue that also fetches the record
366 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
367 TDB_DATA key, struct ctdb_ltdb_header *header,
368 struct ctdb_req_header *hdr, TDB_DATA *data,
369 void (*recv_pkt)(void *, struct ctdb_req_header *),
370 void *recv_context, bool ignore_generation)
374 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
375 recv_context, ignore_generation);
380 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
383 uret = ctdb_ltdb_unlock(ctdb_db, key);
385 DBG_ERR("ctdb_ltdb_unlock() failed with error %d\n",
394 paranoid check to see if the db is empty
396 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
398 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
399 int count = tdb_traverse_read(tdb, NULL, NULL);
401 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
403 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
407 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
408 struct ctdb_db_context *ctdb_db)
410 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
416 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
417 key.dsize = strlen(ctdb_db->db_name);
419 old = ctdb_db->unhealthy_reason;
420 ctdb_db->unhealthy_reason = NULL;
422 val = tdb_fetch(tdb, key);
424 reason = talloc_strndup(ctdb_db,
425 (const char *)val.dptr,
427 if (reason == NULL) {
428 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
430 ctdb_db->unhealthy_reason = old;
441 ctdb_db->unhealthy_reason = reason;
445 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
446 struct ctdb_db_context *ctdb_db,
447 const char *given_reason,/* NULL means healthy */
448 int num_healthy_nodes)
450 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
454 char *new_reason = NULL;
455 char *old_reason = NULL;
457 ret = tdb_transaction_start(tdb);
459 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
460 tdb_name(tdb), ret, tdb_errorstr(tdb)));
464 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
466 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
467 ctdb_db->db_name, ret));
470 old_reason = ctdb_db->unhealthy_reason;
472 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
473 key.dsize = strlen(ctdb_db->db_name);
476 new_reason = talloc_strdup(ctdb_db, given_reason);
477 if (new_reason == NULL) {
478 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
482 } else if (old_reason && num_healthy_nodes == 0) {
484 * If the reason indicates ok, but there where no healthy nodes
485 * available, that it means, we have not recovered valid content
486 * of the db. So if there's an old reason, prefix it with
487 * "NO-HEALTHY-NODES - "
491 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
492 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
494 prefix = _TMP_PREFIX;
498 new_reason = talloc_asprintf(ctdb_db, "%s%s",
500 if (new_reason == NULL) {
501 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
502 prefix, old_reason));
509 val.dptr = discard_const_p(uint8_t, new_reason);
510 val.dsize = strlen(new_reason);
512 ret = tdb_store(tdb, key, val, TDB_REPLACE);
514 tdb_transaction_cancel(tdb);
515 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
516 tdb_name(tdb), ctdb_db->db_name, new_reason,
517 ret, tdb_errorstr(tdb)));
518 talloc_free(new_reason);
521 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
522 ctdb_db->db_name, new_reason));
523 } else if (old_reason) {
524 ret = tdb_delete(tdb, key);
526 tdb_transaction_cancel(tdb);
527 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
528 tdb_name(tdb), ctdb_db->db_name,
529 ret, tdb_errorstr(tdb)));
530 talloc_free(new_reason);
533 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
537 ret = tdb_transaction_commit(tdb);
538 if (ret != TDB_SUCCESS) {
539 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
540 tdb_name(tdb), ret, tdb_errorstr(tdb)));
541 talloc_free(new_reason);
545 talloc_free(old_reason);
546 ctdb_db->unhealthy_reason = new_reason;
551 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
552 struct ctdb_db_context *ctdb_db)
554 time_t now = time(NULL);
562 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
563 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
564 "%04u%02u%02u%02u%02u%02u.0Z",
566 tm->tm_year+1900, tm->tm_mon+1,
567 tm->tm_mday, tm->tm_hour, tm->tm_min,
569 if (new_path == NULL) {
570 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
574 new_reason = talloc_asprintf(ctdb_db,
575 "ERROR - Backup of corrupted TDB in '%s'",
577 if (new_reason == NULL) {
578 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
581 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
582 talloc_free(new_reason);
584 DEBUG(DEBUG_CRIT,(__location__
585 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
590 ret = rename(ctdb_db->db_path, new_path);
592 DEBUG(DEBUG_CRIT,(__location__
593 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
594 ctdb_db->db_path, new_path,
595 errno, strerror(errno)));
596 talloc_free(new_path);
600 DEBUG(DEBUG_CRIT,(__location__
601 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
602 ctdb_db->db_path, new_path));
603 talloc_free(new_path);
607 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
609 struct ctdb_db_context *ctdb_db;
614 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
615 if (!ctdb_db_persistent(ctdb_db)) {
619 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
621 DEBUG(DEBUG_ALERT,(__location__
622 " load persistent health for '%s' failed\n",
627 if (ctdb_db->unhealthy_reason == NULL) {
629 DEBUG(DEBUG_INFO,(__location__
630 " persistent db '%s' healthy\n",
636 DEBUG(DEBUG_ALERT,(__location__
637 " persistent db '%s' unhealthy: %s\n",
639 ctdb_db->unhealthy_reason));
642 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
654 mark a database - as healthy
656 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
658 uint32_t db_id = *(uint32_t *)indata.dptr;
659 struct ctdb_db_context *ctdb_db;
661 bool may_recover = false;
663 ctdb_db = find_ctdb_db(ctdb, db_id);
665 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
669 if (ctdb_db->unhealthy_reason) {
673 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
675 DEBUG(DEBUG_ERR,(__location__
676 " ctdb_update_persistent_health(%s) failed\n",
681 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
682 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
684 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
690 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
694 uint32_t db_id = *(uint32_t *)indata.dptr;
695 struct ctdb_db_context *ctdb_db;
698 ctdb_db = find_ctdb_db(ctdb, db_id);
700 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
704 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
706 DEBUG(DEBUG_ERR,(__location__
707 " ctdb_load_persistent_health(%s) failed\n",
713 if (ctdb_db->unhealthy_reason) {
714 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
715 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
722 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
726 if (ctdb_db_readonly(ctdb_db)) {
730 if (! ctdb_db_volatile(ctdb_db)) {
732 ("Non-volatile databases do not support readonly flag\n"));
736 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
737 if (ropath == NULL) {
738 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
741 ctdb_db->rottdb = tdb_open(ropath,
742 ctdb->tunable.database_hash_size,
743 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
744 O_CREAT|O_RDWR, 0600);
745 if (ctdb_db->rottdb == NULL) {
746 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
751 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
753 ctdb_db_set_readonly(ctdb_db);
755 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
762 attach to a database, handling both persistent and non-persistent databases
763 return 0 on success, -1 on failure
765 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
766 uint8_t db_flags, const char *unhealthy_reason)
768 struct ctdb_db_context *ctdb_db, *tmp_db;
773 int remaining_tries = 0;
775 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
776 CTDB_NO_MEMORY(ctdb, ctdb_db);
778 ctdb_db->ctdb = ctdb;
779 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
780 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
782 key.dsize = strlen(db_name)+1;
783 key.dptr = discard_const(db_name);
784 ctdb_db->db_id = ctdb_hash(&key);
785 ctdb_db->db_flags = db_flags;
787 if (ctdb_db_volatile(ctdb_db)) {
788 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
789 if (ctdb_db->delete_queue == NULL) {
790 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
793 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
796 /* check for hash collisions */
797 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
798 if (tmp_db->db_id == ctdb_db->db_id) {
799 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
800 tmp_db->db_id, db_name, tmp_db->db_name));
801 talloc_free(ctdb_db);
806 if (ctdb_db_persistent(ctdb_db)) {
807 if (unhealthy_reason) {
808 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
809 unhealthy_reason, 0);
811 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
812 ctdb_db->db_name, unhealthy_reason, ret));
813 talloc_free(ctdb_db);
818 if (ctdb->max_persistent_check_errors > 0) {
821 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
825 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
827 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
828 ctdb_db->db_name, ret));
829 talloc_free(ctdb_db);
834 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
835 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
836 ctdb_db->db_name, ctdb_db->unhealthy_reason));
837 talloc_free(ctdb_db);
841 if (ctdb_db->unhealthy_reason) {
842 /* this is just a warning, but we want that in the log file! */
843 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
844 ctdb_db->db_name, ctdb_db->unhealthy_reason));
847 /* open the database */
848 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
849 ctdb_db_persistent(ctdb_db) ?
850 ctdb->db_directory_persistent :
854 tdb_flags = ctdb_db_tdb_flags(db_flags,
856 ctdb_config.tdb_mutexes);
859 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
860 ctdb->tunable.database_hash_size,
862 O_CREAT|O_RDWR, mode);
863 if (ctdb_db->ltdb == NULL) {
865 int saved_errno = errno;
867 if (! ctdb_db_persistent(ctdb_db)) {
868 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
871 strerror(saved_errno)));
872 talloc_free(ctdb_db);
876 if (remaining_tries == 0) {
877 DEBUG(DEBUG_CRIT,(__location__
878 "Failed to open persistent tdb '%s': %d - %s\n",
881 strerror(saved_errno)));
882 talloc_free(ctdb_db);
886 ret = stat(ctdb_db->db_path, &st);
888 DEBUG(DEBUG_CRIT,(__location__
889 "Failed to open persistent tdb '%s': %d - %s\n",
892 strerror(saved_errno)));
893 talloc_free(ctdb_db);
897 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
899 DEBUG(DEBUG_CRIT,(__location__
900 "Failed to open persistent tdb '%s': %d - %s\n",
903 strerror(saved_errno)));
904 talloc_free(ctdb_db);
913 if (!ctdb_db_persistent(ctdb_db)) {
914 ctdb_check_db_empty(ctdb_db);
916 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
921 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
922 ctdb_db->db_path, ret,
923 tdb_errorstr(ctdb_db->ltdb->tdb)));
924 if (remaining_tries == 0) {
925 talloc_free(ctdb_db);
929 fd = tdb_fd(ctdb_db->ltdb->tdb);
930 ret = fstat(fd, &st);
932 DEBUG(DEBUG_CRIT,(__location__
933 "Failed to fstat() persistent tdb '%s': %d - %s\n",
937 talloc_free(ctdb_db);
942 talloc_free(ctdb_db->ltdb);
943 ctdb_db->ltdb = NULL;
945 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
947 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
949 talloc_free(ctdb_db);
959 /* remember the flags the client has specified */
960 tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
963 /* set up a rb tree we can use to track which records we have a
964 fetch-lock in-flight for so we can defer any additional calls
967 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
968 if (ctdb_db->deferred_fetch == NULL) {
969 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
970 talloc_free(ctdb_db);
974 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
975 if (ctdb_db->defer_dmaster == NULL) {
976 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
978 talloc_free(ctdb_db);
982 DLIST_ADD(ctdb->db_list, ctdb_db);
984 /* setting this can help some high churn databases */
985 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
988 all databases support the "null" function. we need this in
989 order to do forced migration of records
991 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
993 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
994 talloc_free(ctdb_db);
999 all databases support the "fetch" function. we need this
1000 for efficient Samba3 ctdb fetch
1002 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1004 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1005 talloc_free(ctdb_db);
1010 all databases support the "fetch_with_header" function. we need this
1011 for efficient readonly record fetches
1013 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1015 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1016 talloc_free(ctdb_db);
1020 ret = ctdb_vacuum_init(ctdb_db);
1022 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1023 "database '%s'\n", ctdb_db->db_name));
1024 talloc_free(ctdb_db);
1028 ret = ctdb_migration_init(ctdb_db);
1031 ("Failed to setup migration tracking for db '%s'\n",
1033 talloc_free(ctdb_db);
1037 ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1038 &ctdb_db->lock_log);
1041 ("Failed to setup lock logging for db '%s'\n",
1043 talloc_free(ctdb_db);
1047 ctdb_db->generation = ctdb->vnn_map->generation;
1049 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1050 ctdb_db->db_path, tdb_flags));
1057 struct ctdb_deferred_attach_context {
1058 struct ctdb_deferred_attach_context *next, *prev;
1059 struct ctdb_context *ctdb;
1060 struct ctdb_req_control_old *c;
1064 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1066 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1071 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1072 struct tevent_timer *te,
1073 struct timeval t, void *private_data)
1075 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1076 struct ctdb_context *ctdb = da_ctx->ctdb;
1078 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1079 talloc_free(da_ctx);
1082 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1083 struct tevent_timer *te,
1084 struct timeval t, void *private_data)
1086 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1087 struct ctdb_context *ctdb = da_ctx->ctdb;
1089 /* This talloc-steals the packet ->c */
1090 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1091 talloc_free(da_ctx);
1094 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1096 struct ctdb_deferred_attach_context *da_ctx;
1098 /* call it from the main event loop as soon as the current event
1101 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1102 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1103 tevent_add_timer(ctdb->ev, da_ctx,
1104 timeval_current_ofs(1,0),
1105 ctdb_deferred_attach_callback, da_ctx);
1112 a client has asked to attach a new database
1114 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb,
1120 struct ctdb_req_control_old *c,
1123 const char *db_name = (const char *)indata.dptr;
1124 struct ctdb_db_context *db;
1125 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1126 struct ctdb_client *client = NULL;
1129 if (ctdb->tunable.allow_client_db_attach == 0) {
1130 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1131 "AllowClientDBAccess == 0\n", db_name));
1135 /* don't allow any local clients to attach while we are in recovery mode
1136 * except for the recovery daemon.
1137 * allow all attach from the network since these are always from remote
1140 if (srcnode == ctdb->pnn && client_id != 0) {
1141 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1143 if (client != NULL) {
1144 /* If the node is inactive it is not part of the cluster
1145 and we should not allow clients to attach to any
1148 if (node->flags & NODE_FLAGS_INACTIVE) {
1149 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1153 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1154 client->pid != ctdb->recoverd_pid &&
1155 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1156 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1158 if (da_ctx == NULL) {
1159 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1163 da_ctx->ctdb = ctdb;
1164 da_ctx->c = talloc_steal(da_ctx, c);
1165 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1166 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1168 tevent_add_timer(ctdb->ev, da_ctx,
1169 timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1170 ctdb_deferred_attach_timeout, da_ctx);
1172 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1173 *async_reply = true;
1178 /* see if we already have this name */
1179 db = ctdb_db_handle(ctdb, db_name);
1181 if ((db->db_flags & db_flags) != db_flags) {
1183 ("Error: Failed to re-attach with 0x%x flags,"
1184 " database has 0x%x flags\n", db_flags,
1188 outdata->dptr = (uint8_t *)&db->db_id;
1189 outdata->dsize = sizeof(db->db_id);
1193 if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
1197 db = ctdb_db_handle(ctdb, db_name);
1199 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1203 outdata->dptr = (uint8_t *)&db->db_id;
1204 outdata->dsize = sizeof(db->db_id);
1206 /* Try to ensure it's locked in mem */
1207 lockdown_memory(ctdb->valgrinding);
1209 if (ctdb_db_persistent(db)) {
1210 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1211 } else if (ctdb_db_replicated(db)) {
1212 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1214 opcode = CTDB_CONTROL_DB_ATTACH;
1217 /* tell all the other nodes about this database */
1218 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
1219 0, CTDB_CTRL_FLAG_NOREPLY,
1220 indata, NULL, NULL);
1227 * a client has asked to detach from a database
1229 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1233 struct ctdb_db_context *ctdb_db;
1234 struct ctdb_client *client = NULL;
1236 db_id = *(uint32_t *)indata.dptr;
1237 ctdb_db = find_ctdb_db(ctdb, db_id);
1238 if (ctdb_db == NULL) {
1239 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1244 if (ctdb->tunable.allow_client_db_attach == 1) {
1245 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1246 "Clients are allowed access to databases "
1247 "(AllowClientDBAccess == 1)\n",
1252 if (! ctdb_db_volatile(ctdb_db)) {
1254 ("Detaching non-volatile database %s denied\n",
1259 /* Cannot detach from database when in recovery */
1260 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1261 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1265 /* If a control comes from a client, then broadcast it to all nodes.
1266 * Do the actual detach only if the control comes from other daemons.
1268 if (client_id != 0) {
1269 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1270 if (client != NULL) {
1271 /* forward the control to all the nodes */
1272 ctdb_daemon_send_control(ctdb,
1273 CTDB_BROADCAST_CONNECTED, 0,
1274 CTDB_CONTROL_DB_DETACH, 0,
1275 CTDB_CTRL_FLAG_NOREPLY,
1276 indata, NULL, NULL);
1279 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1280 "for database '%s'\n", ctdb_db->db_name));
1284 /* Detach database from recoverd */
1285 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1286 CTDB_SRVID_DETACH_DATABASE,
1288 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1292 /* Disable vacuuming and drop all vacuuming data */
1293 talloc_free(ctdb_db->vacuum_handle);
1294 talloc_free(ctdb_db->delete_queue);
1296 /* Terminate any deferred fetch */
1297 talloc_free(ctdb_db->deferred_fetch);
1299 /* Terminate any traverses */
1300 while (ctdb_db->traverse) {
1301 talloc_free(ctdb_db->traverse);
1304 /* Terminate any revokes */
1305 while (ctdb_db->revokechild_active) {
1306 talloc_free(ctdb_db->revokechild_active);
1309 /* Free readonly tracking database */
1310 if (ctdb_db_readonly(ctdb_db)) {
1311 talloc_free(ctdb_db->rottdb);
1314 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1316 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1318 talloc_free(ctdb_db);
1324 attach to all existing persistent databases
1326 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1327 const char *unhealthy_reason)
1332 /* open the persistent db directory and scan it for files */
1333 d = opendir(ctdb->db_directory_persistent);
1338 while ((de=readdir(d))) {
1340 size_t len = strlen(de->d_name);
1342 int invalid_name = 0;
1344 s = talloc_strdup(ctdb, de->d_name);
1347 CTDB_NO_MEMORY(ctdb, s);
1350 /* only accept names ending in .tdb */
1351 p = strstr(s, ".tdb.");
1352 if (len < 7 || p == NULL) {
1357 /* only accept names ending with .tdb. and any number of digits */
1359 while (*q != 0 && invalid_name == 0) {
1360 if (!isdigit(*q++)) {
1364 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1365 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1371 if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
1372 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1378 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1386 int ctdb_attach_databases(struct ctdb_context *ctdb)
1389 char *persistent_health_path = NULL;
1390 char *unhealthy_reason = NULL;
1391 bool first_try = true;
1393 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1394 ctdb->db_directory_state,
1395 PERSISTENT_HEALTH_TDB,
1397 if (persistent_health_path == NULL) {
1398 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1404 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1405 0, TDB_DISALLOW_NESTING,
1406 O_CREAT | O_RDWR, 0600);
1407 if (ctdb->db_persistent_health == NULL) {
1408 struct tdb_wrap *tdb;
1411 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1412 persistent_health_path,
1415 talloc_free(persistent_health_path);
1416 talloc_free(unhealthy_reason);
1421 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1422 persistent_health_path,
1423 "was cleared after a failure",
1424 "manual verification needed");
1425 if (unhealthy_reason == NULL) {
1426 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1427 talloc_free(persistent_health_path);
1431 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1432 persistent_health_path));
1433 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1434 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1435 O_CREAT | O_RDWR, 0600);
1437 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1438 persistent_health_path,
1441 talloc_free(persistent_health_path);
1442 talloc_free(unhealthy_reason);
1449 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1451 struct tdb_wrap *tdb;
1453 talloc_free(ctdb->db_persistent_health);
1454 ctdb->db_persistent_health = NULL;
1457 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1458 persistent_health_path));
1459 talloc_free(persistent_health_path);
1460 talloc_free(unhealthy_reason);
1465 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1466 persistent_health_path,
1467 "was cleared after a failure",
1468 "manual verification needed");
1469 if (unhealthy_reason == NULL) {
1470 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1471 talloc_free(persistent_health_path);
1475 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1476 persistent_health_path));
1477 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1478 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1479 O_CREAT | O_RDWR, 0600);
1481 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1482 persistent_health_path,
1485 talloc_free(persistent_health_path);
1486 talloc_free(unhealthy_reason);
1493 talloc_free(persistent_health_path);
1495 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1496 talloc_free(unhealthy_reason);
1505 called when a broadcast seqnum update comes in
1507 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1509 struct ctdb_db_context *ctdb_db;
1510 if (srcnode == ctdb->pnn) {
1511 /* don't update ourselves! */
1515 ctdb_db = find_ctdb_db(ctdb, db_id);
1517 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1521 if (ctdb_db->unhealthy_reason) {
1522 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1523 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1527 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1528 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1533 timer to check for seqnum changes in a ltdb and propagate them
1535 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1536 struct tevent_timer *te,
1537 struct timeval t, void *p)
1539 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1540 struct ctdb_context *ctdb = ctdb_db->ctdb;
1541 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1542 if (new_seqnum != ctdb_db->seqnum) {
1543 /* something has changed - propagate it */
1545 data.dptr = (uint8_t *)&ctdb_db->db_id;
1546 data.dsize = sizeof(uint32_t);
1547 ctdb_daemon_send_control(ctdb,
1548 CTDB_BROADCAST_ACTIVE,
1550 CTDB_CONTROL_UPDATE_SEQNUM,
1552 CTDB_CTRL_FLAG_NOREPLY,
1557 ctdb_db->seqnum = new_seqnum;
1559 /* setup a new timer */
1560 ctdb_db->seqnum_update =
1561 tevent_add_timer(ctdb->ev, ctdb_db,
1562 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1563 (ctdb->tunable.seqnum_interval%1000)*1000),
1564 ctdb_ltdb_seqnum_check, ctdb_db);
1568 enable seqnum handling on this db
1570 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1572 struct ctdb_db_context *ctdb_db;
1573 ctdb_db = find_ctdb_db(ctdb, db_id);
1575 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1579 if (ctdb_db->seqnum_update == NULL) {
1580 ctdb_db->seqnum_update = tevent_add_timer(
1582 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1583 (ctdb->tunable.seqnum_interval%1000)*1000),
1584 ctdb_ltdb_seqnum_check, ctdb_db);
1587 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1588 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1592 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1594 if (ctdb_db_sticky(ctdb_db)) {
1598 if (! ctdb_db_volatile(ctdb_db)) {
1600 ("Non-volatile databases do not support sticky flag\n"));
1604 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1606 ctdb_db_set_sticky(ctdb_db);
1608 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1613 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1615 struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1618 for (i=0; i<MAX_HOT_KEYS; i++) {
1619 if (s->hot_keys[i].key.dsize > 0) {
1620 talloc_free(s->hot_keys[i].key.dptr);
1624 ZERO_STRUCT(ctdb_db->statistics);
1627 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1631 struct ctdb_db_context *ctdb_db;
1632 struct ctdb_db_statistics_old *stats;
1637 ctdb_db = find_ctdb_db(ctdb, db_id);
1639 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1643 len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1644 for (i = 0; i < MAX_HOT_KEYS; i++) {
1645 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1648 stats = talloc_size(outdata, len);
1649 if (stats == NULL) {
1650 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1654 memcpy(stats, &ctdb_db->statistics,
1655 offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1657 stats->num_hot_keys = MAX_HOT_KEYS;
1659 ptr = &stats->hot_keys_wire[0];
1660 for (i = 0; i < MAX_HOT_KEYS; i++) {
1661 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1662 ctdb_db->statistics.hot_keys[i].key.dsize);
1663 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1666 outdata->dptr = (uint8_t *)stats;
1667 outdata->dsize = len;