4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_req_control *c;
35 uint32_t num_failed, num_sent;
39 1) all nodes fail, and all nodes reply
40 2) some nodes fail, all nodes reply
46 called when a node has acknowledged a ctdb_control_update_record call
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49 int32_t status, TDB_DATA data,
53 struct ctdb_persistent_state *state = talloc_get_type(private_data,
54 struct ctdb_persistent_state);
56 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
57 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
58 "during recovery\n"));
63 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
65 state->status = status;
66 state->errormsg = errormsg;
70 if (state->num_pending == 0) {
71 enum ctdb_trans2_commit_error etype;
72 if (state->num_failed == state->num_sent) {
73 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
74 } else if (state->num_failed != 0) {
75 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
77 etype = CTDB_TRANS2_COMMIT_SUCCESS;
79 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
85 called if persistent store times out
87 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
88 struct timeval t, void *private_data)
90 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
92 if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
93 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
94 "timeout during recovery\n"));
98 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
99 "timeout in ctdb_persistent_state");
105 store a set of persistent records - called from a ctdb client when it has updated
106 some records in a persistent database. The client will have the record
107 locked for the duration of this call. The client is the dmaster when
110 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
111 struct ctdb_req_control *c,
112 TDB_DATA recdata, bool *async_reply)
114 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
115 struct ctdb_persistent_state *state;
117 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
118 struct ctdb_db_context *ctdb_db;
120 ctdb_db = find_ctdb_db(ctdb, m->db_id);
121 if (ctdb_db == NULL) {
122 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
123 "Unknown database db_id[0x%08x]\n", m->db_id));
127 if (client == NULL) {
128 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
132 if (ctdb_db->unhealthy_reason) {
133 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
134 ctdb_db->db_name, ctdb_db->unhealthy_reason));
138 /* handling num_persistent_updates is a bit strange -
140 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
141 They don't expect num_persistent_updates to be used at all
143 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
144 this commit to then decrement it
146 3) new clients which use TRANS2 commit functions, and
147 expect this function to increment the counter, and
148 then have it decremented in ctdb_control_trans2_error
149 or ctdb_control_trans2_finished
152 case CTDB_CONTROL_PERSISTENT_STORE:
153 if (ctdb_db->transaction_active) {
154 DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
155 "transaction is active on database "
156 "db_id[0x%08x] - refusing persistent "
157 " store for client id[0x%08x]\n",
158 ctdb_db->db_id, client->client_id));
161 if (client->num_persistent_updates > 0) {
162 client->num_persistent_updates--;
165 case CTDB_CONTROL_TRANS2_COMMIT:
166 if (ctdb_db->transaction_active) {
167 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
168 " already a transaction commit "
169 "active on db_id[0x%08x] - forbidding "
170 "client_id[0x%08x] to commit\n",
171 ctdb_db->db_id, client->client_id));
174 if (client->db_id != 0) {
175 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
176 "client-db_id[0x%08x] != 0 "
177 "(client_id[0x%08x])\n",
178 client->db_id, client->client_id));
181 client->num_persistent_updates++;
182 ctdb_db->transaction_active = true;
183 client->db_id = m->db_id;
184 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
185 " commit transaction on db id[0x%08x]\n",
186 client->client_id, client->db_id));
188 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
189 /* already updated from the first commit */
190 if (client->db_id != m->db_id) {
191 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
192 "retry: client-db_id[0x%08x] != "
193 "db_id[0x%08x] (client_id[0x%08x])\n",
195 m->db_id, client->client_id));
198 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
199 "transaction commit retry on "
201 client->client_id, client->db_id));
205 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
206 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
210 state = talloc_zero(ctdb, struct ctdb_persistent_state);
211 CTDB_NO_MEMORY(ctdb, state);
216 for (i=0;i<ctdb->vnn_map->size;i++) {
217 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
220 /* only send to active nodes */
221 if (node->flags & NODE_FLAGS_INACTIVE) {
225 /* don't send to ourselves */
226 if (node->pnn == ctdb->pnn) {
230 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
231 c->client_id, 0, recdata,
232 ctdb_persistent_callback, state);
234 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
239 state->num_pending++;
243 if (state->num_pending == 0) {
248 /* we need to wait for the replies */
251 /* need to keep the control structure around */
252 talloc_steal(state, c);
254 /* but we won't wait forever */
255 event_add_timed(ctdb->ev, state,
256 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
257 ctdb_persistent_store_timeout, state);
264 * Store a set of persistent records.
265 * This is used to roll out a transaction to all nodes.
267 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
268 struct ctdb_req_control *c,
269 TDB_DATA recdata, bool *async_reply)
271 struct ctdb_client *client;
272 struct ctdb_persistent_state *state;
274 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
275 struct ctdb_db_context *ctdb_db;
277 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
278 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
282 ctdb_db = find_ctdb_db(ctdb, m->db_id);
283 if (ctdb_db == NULL) {
284 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
285 "Unknown database db_id[0x%08x]\n", m->db_id));
289 client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
290 if (client == NULL) {
291 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
292 "to a client. Returning error\n"));
296 state = talloc_zero(ctdb, struct ctdb_persistent_state);
297 CTDB_NO_MEMORY(ctdb, state);
302 for (i = 0; i < ctdb->vnn_map->size; i++) {
303 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
306 /* only send to active nodes */
307 if (node->flags & NODE_FLAGS_INACTIVE) {
311 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
312 CTDB_CONTROL_UPDATE_RECORD,
313 c->client_id, 0, recdata,
314 ctdb_persistent_callback,
317 DEBUG(DEBUG_ERR,("Unable to send "
318 "CTDB_CONTROL_UPDATE_RECORD "
319 "to pnn %u\n", node->pnn));
324 state->num_pending++;
328 if (state->num_pending == 0) {
333 /* we need to wait for the replies */
336 /* need to keep the control structure around */
337 talloc_steal(state, c);
339 /* but we won't wait forever */
340 event_add_timed(ctdb->ev, state,
341 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
342 ctdb_persistent_store_timeout, state);
348 struct ctdb_persistent_write_state {
349 struct ctdb_db_context *ctdb_db;
350 struct ctdb_marshall_buffer *m;
351 struct ctdb_req_control *c;
356 called from a child process to write the data
358 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
361 struct ctdb_rec_data *rec = NULL;
362 struct ctdb_marshall_buffer *m = state->m;
364 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
366 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
367 state->ctdb_db->db_id));
371 for (i=0;i<m->count;i++) {
372 struct ctdb_ltdb_header oldheader;
373 struct ctdb_ltdb_header header;
374 TDB_DATA key, data, olddata;
375 TALLOC_CTX *tmp_ctx = talloc_new(state);
377 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
380 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
381 i, state->ctdb_db->db_id));
382 talloc_free(tmp_ctx);
386 /* fetch the old header and ensure the rsn is less than the new rsn */
387 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
389 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
390 state->ctdb_db->db_id));
391 talloc_free(tmp_ctx);
395 if (oldheader.rsn >= header.rsn &&
396 (olddata.dsize != data.dsize ||
397 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
398 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
399 state->ctdb_db->db_id,
400 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
401 talloc_free(tmp_ctx);
405 talloc_free(tmp_ctx);
407 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
409 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
410 state->ctdb_db->db_id));
415 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
417 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
418 state->ctdb_db->db_id));
425 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
431 called when we the child has completed the persistent write
434 static void ctdb_persistent_write_callback(int status, void *private_data)
436 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
437 struct ctdb_persistent_write_state);
440 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
446 called if our lockwait child times out
448 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
449 struct timeval t, void *private_data)
451 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
452 struct ctdb_persistent_write_state);
453 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
457 struct childwrite_handle {
458 struct ctdb_context *ctdb;
459 struct ctdb_db_context *ctdb_db;
460 struct fd_event *fde;
464 void (*callback)(int, void *);
465 struct timeval start_time;
468 static int childwrite_destructor(struct childwrite_handle *h)
470 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
471 kill(h->child, SIGKILL);
475 /* called when the child process has finished writing the record to the
478 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
479 uint16_t flags, void *private_data)
481 struct childwrite_handle *h = talloc_get_type(private_data,
482 struct childwrite_handle);
483 void *p = h->private_data;
484 void (*callback)(int, void *) = h->callback;
485 pid_t child = h->child;
486 TALLOC_CTX *tmp_ctx = talloc_new(ev);
490 CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
491 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
493 /* the handle needs to go away when the context is gone - when
494 the handle goes away this implicitly closes the pipe, which
496 talloc_steal(tmp_ctx, h);
498 talloc_set_destructor(h, NULL);
500 ret = read(h->fd[0], &c, 1);
502 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
508 kill(child, SIGKILL);
509 talloc_free(tmp_ctx);
512 /* this creates a child process which will take out a tdb transaction
513 and write the record to the database.
515 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
516 void (*callback)(int, void *private_data),
517 struct ctdb_persistent_write_state *state)
519 struct childwrite_handle *result;
521 pid_t parent = getpid();
523 CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
524 CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
526 if (!(result = talloc_zero(state, struct childwrite_handle))) {
527 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
531 ret = pipe(result->fd);
535 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
539 result->child = ctdb_fork(ctdb_db->ctdb);
541 if (result->child == (pid_t)-1) {
542 close(result->fd[0]);
543 close(result->fd[1]);
545 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
549 result->callback = callback;
550 result->private_data = state;
551 result->ctdb = ctdb_db->ctdb;
552 result->ctdb_db = ctdb_db;
554 if (result->child == 0) {
557 close(result->fd[0]);
558 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
559 ret = ctdb_persistent_store(state);
561 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
565 write(result->fd[1], &c, 1);
567 /* make sure we die when our parent dies */
568 while (kill(parent, 0) == 0 || errno != ESRCH) {
574 close(result->fd[1]);
575 set_close_on_exec(result->fd[0]);
577 talloc_set_destructor(result, childwrite_destructor);
579 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
581 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
582 EVENT_FD_READ, childwrite_handler,
584 if (result->fde == NULL) {
586 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
589 tevent_fd_set_auto_close(result->fde);
591 result->start_time = timeval_current();
597 update a record on this node if the new record has a higher rsn than the
600 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
601 struct ctdb_req_control *c, TDB_DATA recdata,
604 struct ctdb_db_context *ctdb_db;
605 struct ctdb_persistent_write_state *state;
606 struct childwrite_handle *handle;
607 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
609 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
610 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
614 ctdb_db = find_ctdb_db(ctdb, m->db_id);
615 if (ctdb_db == NULL) {
616 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
620 if (ctdb_db->unhealthy_reason) {
621 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
622 ctdb_db->db_name, ctdb_db->unhealthy_reason));
626 state = talloc(ctdb, struct ctdb_persistent_write_state);
627 CTDB_NO_MEMORY(ctdb, state);
629 state->ctdb_db = ctdb_db;
633 /* create a child process to take out a transaction and
636 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
637 if (handle == NULL) {
638 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
643 /* we need to wait for the replies */
646 /* need to keep the control structure around */
647 talloc_steal(state, c);
649 /* but we won't wait forever */
650 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
651 ctdb_persistent_lock_timeout, state);
658 called when a client has finished a local commit in a transaction to
659 a persistent database
661 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
662 struct ctdb_req_control *c)
664 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
665 struct ctdb_db_context *ctdb_db;
667 ctdb_db = find_ctdb_db(ctdb, client->db_id);
668 if (ctdb_db == NULL) {
669 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
670 "Unknown database 0x%08x\n", client->db_id));
673 if (!ctdb_db->transaction_active) {
674 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
675 "Database 0x%08x has no transaction commit "
676 "started\n", client->db_id));
680 ctdb_db->transaction_active = false;
683 if (client->num_persistent_updates == 0) {
684 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
685 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
686 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
689 client->num_persistent_updates--;
691 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
692 "transaction commit db_id[0x%08x]\n",
693 client->client_id, ctdb_db->db_id));
699 called when a client gets an error committing its database
700 during a transaction commit
702 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
703 struct ctdb_req_control *c)
705 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
706 struct ctdb_db_context *ctdb_db;
708 ctdb_db = find_ctdb_db(ctdb, client->db_id);
709 if (ctdb_db == NULL) {
710 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
711 "Unknown database 0x%08x\n", client->db_id));
714 if (!ctdb_db->transaction_active) {
715 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
716 "Database 0x%08x has no transaction commit "
717 "started\n", client->db_id));
721 ctdb_db->transaction_active = false;
724 if (client->num_persistent_updates == 0) {
725 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
727 client->num_persistent_updates--;
730 DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
731 " db_id[0x%08x] - forcing recovery\n",
733 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
739 * Tell whether a transaction is active on this node on the give DB.
741 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
742 struct ctdb_req_control *c,
745 struct ctdb_db_context *ctdb_db;
746 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
748 ctdb_db = find_ctdb_db(ctdb, db_id);
750 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
754 if (client->db_id == db_id) {
758 if (ctdb_db->transaction_active) {
766 backwards compatibility:
768 start a persistent store operation. passing both the key, header and
769 data to the daemon. If the client disconnects before it has issued
770 a persistent_update call to the daemon we trigger a full recovery
771 to ensure the databases are brought back in sync.
772 for now we ignore the recdata that the client has passed to us.
774 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
775 struct ctdb_req_control *c,
778 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
780 if (client == NULL) {
781 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
785 client->num_persistent_updates++;
791 backwards compatibility:
793 called to tell ctdbd that it is no longer doing a persistent update
795 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
796 struct ctdb_req_control *c,
799 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
801 if (client == NULL) {
802 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
806 if (client->num_persistent_updates > 0) {
807 client->num_persistent_updates--;
815 backwards compatibility:
817 single record varient of ctdb_control_trans2_commit for older clients
819 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
820 struct ctdb_req_control *c,
821 TDB_DATA recdata, bool *async_reply)
823 struct ctdb_marshall_buffer *m;
824 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
827 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
828 rec->keylen + rec->datalen) {
829 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
833 key.dptr = &rec->data[0];
834 key.dsize = rec->keylen;
835 data.dptr = &rec->data[rec->keylen];
836 data.dsize = rec->datalen;
838 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
839 CTDB_NO_MEMORY(ctdb, m);
841 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
844 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
849 struct ctdb_db_context *ctdb_db;
850 const char *keyname = CTDB_DB_SEQNUM_KEY;
853 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
855 ctdb_db = find_ctdb_db(ctdb, db_id);
857 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
862 key.dptr = (uint8_t *)discard_const(keyname);
863 key.dsize = strlen(keyname) + 1;
865 ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
870 if (data.dsize != sizeof(uint64_t)) {
875 *seqnum = *(uint64_t *)data.dptr;
878 talloc_free(mem_ctx);
883 * Get the sequence number of a persistent database.
885 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
893 db_id = *(uint32_t *)indata.dptr;
894 ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
899 outdata->dsize = sizeof(uint64_t);
900 outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
901 if (outdata->dptr == NULL) {
906 *(outdata->dptr) = seqnum;