4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_req_control *c;
35 uint32_t num_failed, num_sent;
39 1) all nodes fail, and all nodes reply
40 2) some nodes fail, all nodes reply
46 called when a node has acknowledged a ctdb_control_update_record call
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49 int32_t status, TDB_DATA data,
53 struct ctdb_persistent_state *state = talloc_get_type(private_data,
54 struct ctdb_persistent_state);
55 enum ctdb_trans2_commit_error etype;
57 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
58 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
59 "during recovery\n"));
64 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
66 state->status = status;
67 state->errormsg = errormsg;
71 * If a node failed to complete the update_record control,
72 * then either a recovery is already running or something
73 * bad is going on. So trigger a recovery and let the
74 * recovery finish the transaction, sending back the reply
75 * for the trans3_commit control to the client.
77 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
83 if (state->num_pending != 0) {
87 if (state->num_failed == state->num_sent) {
88 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
89 } else if (state->num_failed != 0) {
90 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
92 etype = CTDB_TRANS2_COMMIT_SUCCESS;
95 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
100 called if persistent store times out
102 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
103 struct timeval t, void *private_data)
105 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
107 if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
108 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
109 "timeout during recovery\n"));
113 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
114 "timeout in ctdb_persistent_state");
120 store a set of persistent records - called from a ctdb client when it has updated
121 some records in a persistent database. The client will have the record
122 locked for the duration of this call. The client is the dmaster when
125 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
126 struct ctdb_req_control *c,
127 TDB_DATA recdata, bool *async_reply)
129 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
130 struct ctdb_persistent_state *state;
132 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
133 struct ctdb_db_context *ctdb_db;
135 ctdb_db = find_ctdb_db(ctdb, m->db_id);
136 if (ctdb_db == NULL) {
137 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
138 "Unknown database db_id[0x%08x]\n", m->db_id));
142 if (client == NULL) {
143 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
147 if (ctdb_db->unhealthy_reason) {
148 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
149 ctdb_db->db_name, ctdb_db->unhealthy_reason));
153 /* handling num_persistent_updates is a bit strange -
155 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
156 They don't expect num_persistent_updates to be used at all
158 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
159 this commit to then decrement it
161 3) new clients which use TRANS2 commit functions, and
162 expect this function to increment the counter, and
163 then have it decremented in ctdb_control_trans2_error
164 or ctdb_control_trans2_finished
167 case CTDB_CONTROL_PERSISTENT_STORE:
168 if (ctdb_db->transaction_active) {
169 DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
170 "transaction is active on database "
171 "db_id[0x%08x] - refusing persistent "
172 " store for client id[0x%08x]\n",
173 ctdb_db->db_id, client->client_id));
176 if (client->num_persistent_updates > 0) {
177 client->num_persistent_updates--;
180 case CTDB_CONTROL_TRANS2_COMMIT:
181 if (ctdb_db->transaction_active) {
182 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
183 " already a transaction commit "
184 "active on db_id[0x%08x] - forbidding "
185 "client_id[0x%08x] to commit\n",
186 ctdb_db->db_id, client->client_id));
189 if (client->db_id != 0) {
190 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
191 "client-db_id[0x%08x] != 0 "
192 "(client_id[0x%08x])\n",
193 client->db_id, client->client_id));
196 client->num_persistent_updates++;
197 ctdb_db->transaction_active = true;
198 client->db_id = m->db_id;
199 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
200 " commit transaction on db id[0x%08x]\n",
201 client->client_id, client->db_id));
203 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
204 /* already updated from the first commit */
205 if (client->db_id != m->db_id) {
206 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
207 "retry: client-db_id[0x%08x] != "
208 "db_id[0x%08x] (client_id[0x%08x])\n",
210 m->db_id, client->client_id));
213 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
214 "transaction commit retry on "
216 client->client_id, client->db_id));
220 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
221 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
225 state = talloc_zero(ctdb, struct ctdb_persistent_state);
226 CTDB_NO_MEMORY(ctdb, state);
231 for (i=0;i<ctdb->vnn_map->size;i++) {
232 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
235 /* only send to active nodes */
236 if (node->flags & NODE_FLAGS_INACTIVE) {
240 /* don't send to ourselves */
241 if (node->pnn == ctdb->pnn) {
245 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
246 c->client_id, 0, recdata,
247 ctdb_persistent_callback, state);
249 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
254 state->num_pending++;
258 if (state->num_pending == 0) {
263 /* we need to wait for the replies */
266 /* need to keep the control structure around */
267 talloc_steal(state, c);
269 /* but we won't wait forever */
270 event_add_timed(ctdb->ev, state,
271 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
272 ctdb_persistent_store_timeout, state);
279 * Store a set of persistent records.
280 * This is used to roll out a transaction to all nodes.
282 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
283 struct ctdb_req_control *c,
284 TDB_DATA recdata, bool *async_reply)
286 struct ctdb_client *client;
287 struct ctdb_persistent_state *state;
289 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
290 struct ctdb_db_context *ctdb_db;
292 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
293 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
297 ctdb_db = find_ctdb_db(ctdb, m->db_id);
298 if (ctdb_db == NULL) {
299 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
300 "Unknown database db_id[0x%08x]\n", m->db_id));
304 client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
305 if (client == NULL) {
306 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
307 "to a client. Returning error\n"));
311 state = talloc_zero(ctdb, struct ctdb_persistent_state);
312 CTDB_NO_MEMORY(ctdb, state);
317 for (i = 0; i < ctdb->vnn_map->size; i++) {
318 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
321 /* only send to active nodes */
322 if (node->flags & NODE_FLAGS_INACTIVE) {
326 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
327 CTDB_CONTROL_UPDATE_RECORD,
328 c->client_id, 0, recdata,
329 ctdb_persistent_callback,
332 DEBUG(DEBUG_ERR,("Unable to send "
333 "CTDB_CONTROL_UPDATE_RECORD "
334 "to pnn %u\n", node->pnn));
339 state->num_pending++;
343 if (state->num_pending == 0) {
348 /* we need to wait for the replies */
351 /* need to keep the control structure around */
352 talloc_steal(state, c);
354 /* but we won't wait forever */
355 event_add_timed(ctdb->ev, state,
356 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
357 ctdb_persistent_store_timeout, state);
363 struct ctdb_persistent_write_state {
364 struct ctdb_db_context *ctdb_db;
365 struct ctdb_marshall_buffer *m;
366 struct ctdb_req_control *c;
371 called from a child process to write the data
373 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
376 struct ctdb_rec_data *rec = NULL;
377 struct ctdb_marshall_buffer *m = state->m;
379 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
381 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
382 state->ctdb_db->db_id));
386 for (i=0;i<m->count;i++) {
387 struct ctdb_ltdb_header oldheader;
388 struct ctdb_ltdb_header header;
389 TDB_DATA key, data, olddata;
390 TALLOC_CTX *tmp_ctx = talloc_new(state);
392 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
395 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
396 i, state->ctdb_db->db_id));
397 talloc_free(tmp_ctx);
401 /* fetch the old header and ensure the rsn is less than the new rsn */
402 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
404 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
405 state->ctdb_db->db_id));
406 talloc_free(tmp_ctx);
410 if (oldheader.rsn >= header.rsn &&
411 (olddata.dsize != data.dsize ||
412 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
413 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
414 state->ctdb_db->db_id,
415 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
416 talloc_free(tmp_ctx);
420 talloc_free(tmp_ctx);
422 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
424 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
425 state->ctdb_db->db_id));
430 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
432 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
433 state->ctdb_db->db_id));
440 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
446 called when we the child has completed the persistent write
449 static void ctdb_persistent_write_callback(int status, void *private_data)
451 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
452 struct ctdb_persistent_write_state);
455 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
461 called if our lockwait child times out
463 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
464 struct timeval t, void *private_data)
466 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
467 struct ctdb_persistent_write_state);
468 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
472 struct childwrite_handle {
473 struct ctdb_context *ctdb;
474 struct ctdb_db_context *ctdb_db;
475 struct fd_event *fde;
479 void (*callback)(int, void *);
480 struct timeval start_time;
483 static int childwrite_destructor(struct childwrite_handle *h)
485 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
486 kill(h->child, SIGKILL);
490 /* called when the child process has finished writing the record to the
493 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
494 uint16_t flags, void *private_data)
496 struct childwrite_handle *h = talloc_get_type(private_data,
497 struct childwrite_handle);
498 void *p = h->private_data;
499 void (*callback)(int, void *) = h->callback;
500 pid_t child = h->child;
501 TALLOC_CTX *tmp_ctx = talloc_new(ev);
505 CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
506 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
508 /* the handle needs to go away when the context is gone - when
509 the handle goes away this implicitly closes the pipe, which
511 talloc_steal(tmp_ctx, h);
513 talloc_set_destructor(h, NULL);
515 ret = read(h->fd[0], &c, 1);
517 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
523 kill(child, SIGKILL);
524 talloc_free(tmp_ctx);
527 /* this creates a child process which will take out a tdb transaction
528 and write the record to the database.
530 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
531 void (*callback)(int, void *private_data),
532 struct ctdb_persistent_write_state *state)
534 struct childwrite_handle *result;
536 pid_t parent = getpid();
538 CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
539 CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
541 if (!(result = talloc_zero(state, struct childwrite_handle))) {
542 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
546 ret = pipe(result->fd);
550 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
554 result->child = ctdb_fork(ctdb_db->ctdb);
556 if (result->child == (pid_t)-1) {
557 close(result->fd[0]);
558 close(result->fd[1]);
560 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
564 result->callback = callback;
565 result->private_data = state;
566 result->ctdb = ctdb_db->ctdb;
567 result->ctdb_db = ctdb_db;
569 if (result->child == 0) {
572 close(result->fd[0]);
573 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
574 ret = ctdb_persistent_store(state);
576 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
580 write(result->fd[1], &c, 1);
582 /* make sure we die when our parent dies */
583 while (kill(parent, 0) == 0 || errno != ESRCH) {
589 close(result->fd[1]);
590 set_close_on_exec(result->fd[0]);
592 talloc_set_destructor(result, childwrite_destructor);
594 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
596 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
597 EVENT_FD_READ, childwrite_handler,
599 if (result->fde == NULL) {
601 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
604 tevent_fd_set_auto_close(result->fde);
606 result->start_time = timeval_current();
612 update a record on this node if the new record has a higher rsn than the
615 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
616 struct ctdb_req_control *c, TDB_DATA recdata,
619 struct ctdb_db_context *ctdb_db;
620 struct ctdb_persistent_write_state *state;
621 struct childwrite_handle *handle;
622 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
624 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
625 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
629 ctdb_db = find_ctdb_db(ctdb, m->db_id);
630 if (ctdb_db == NULL) {
631 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
635 if (ctdb_db->unhealthy_reason) {
636 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
637 ctdb_db->db_name, ctdb_db->unhealthy_reason));
641 state = talloc(ctdb, struct ctdb_persistent_write_state);
642 CTDB_NO_MEMORY(ctdb, state);
644 state->ctdb_db = ctdb_db;
648 /* create a child process to take out a transaction and
651 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
652 if (handle == NULL) {
653 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
658 /* we need to wait for the replies */
661 /* need to keep the control structure around */
662 talloc_steal(state, c);
664 /* but we won't wait forever */
665 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
666 ctdb_persistent_lock_timeout, state);
673 called when a client has finished a local commit in a transaction to
674 a persistent database
676 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
677 struct ctdb_req_control *c)
679 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
680 struct ctdb_db_context *ctdb_db;
682 ctdb_db = find_ctdb_db(ctdb, client->db_id);
683 if (ctdb_db == NULL) {
684 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
685 "Unknown database 0x%08x\n", client->db_id));
688 if (!ctdb_db->transaction_active) {
689 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
690 "Database 0x%08x has no transaction commit "
691 "started\n", client->db_id));
695 ctdb_db->transaction_active = false;
698 if (client->num_persistent_updates == 0) {
699 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
700 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
701 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
704 client->num_persistent_updates--;
706 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
707 "transaction commit db_id[0x%08x]\n",
708 client->client_id, ctdb_db->db_id));
714 called when a client gets an error committing its database
715 during a transaction commit
717 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
718 struct ctdb_req_control *c)
720 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
721 struct ctdb_db_context *ctdb_db;
723 ctdb_db = find_ctdb_db(ctdb, client->db_id);
724 if (ctdb_db == NULL) {
725 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
726 "Unknown database 0x%08x\n", client->db_id));
729 if (!ctdb_db->transaction_active) {
730 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
731 "Database 0x%08x has no transaction commit "
732 "started\n", client->db_id));
736 ctdb_db->transaction_active = false;
739 if (client->num_persistent_updates == 0) {
740 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
742 client->num_persistent_updates--;
745 DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
746 " db_id[0x%08x] - forcing recovery\n",
748 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
754 * Tell whether a transaction is active on this node on the give DB.
756 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
757 struct ctdb_req_control *c,
760 struct ctdb_db_context *ctdb_db;
761 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
763 ctdb_db = find_ctdb_db(ctdb, db_id);
765 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
769 if (client->db_id == db_id) {
773 if (ctdb_db->transaction_active) {
781 backwards compatibility:
783 start a persistent store operation. passing both the key, header and
784 data to the daemon. If the client disconnects before it has issued
785 a persistent_update call to the daemon we trigger a full recovery
786 to ensure the databases are brought back in sync.
787 for now we ignore the recdata that the client has passed to us.
789 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
790 struct ctdb_req_control *c,
793 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
795 if (client == NULL) {
796 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
800 client->num_persistent_updates++;
806 backwards compatibility:
808 called to tell ctdbd that it is no longer doing a persistent update
810 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
811 struct ctdb_req_control *c,
814 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
816 if (client == NULL) {
817 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
821 if (client->num_persistent_updates > 0) {
822 client->num_persistent_updates--;
830 backwards compatibility:
832 single record varient of ctdb_control_trans2_commit for older clients
834 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
835 struct ctdb_req_control *c,
836 TDB_DATA recdata, bool *async_reply)
838 struct ctdb_marshall_buffer *m;
839 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
842 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
843 rec->keylen + rec->datalen) {
844 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
848 key.dptr = &rec->data[0];
849 key.dsize = rec->keylen;
850 data.dptr = &rec->data[rec->keylen];
851 data.dsize = rec->datalen;
853 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
854 CTDB_NO_MEMORY(ctdb, m);
856 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
859 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
864 struct ctdb_db_context *ctdb_db;
865 const char *keyname = CTDB_DB_SEQNUM_KEY;
868 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
870 ctdb_db = find_ctdb_db(ctdb, db_id);
872 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
877 key.dptr = (uint8_t *)discard_const(keyname);
878 key.dsize = strlen(keyname) + 1;
880 ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
885 if (data.dsize != sizeof(uint64_t)) {
890 *seqnum = *(uint64_t *)data.dptr;
893 talloc_free(mem_ctx);
898 * Get the sequence number of a persistent database.
900 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
908 db_id = *(uint32_t *)indata.dptr;
909 ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
914 outdata->dsize = sizeof(uint64_t);
915 outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
916 if (outdata->dptr == NULL) {
921 *(outdata->dptr) = seqnum;