4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_req_control *c;
35 uint32_t num_failed, num_sent;
39 1) all nodes fail, and all nodes reply
40 2) some nodes fail, all nodes reply
46 called when a node has acknowledged a ctdb_control_update_record call
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49 int32_t status, TDB_DATA data,
53 struct ctdb_persistent_state *state = talloc_get_type(private_data,
54 struct ctdb_persistent_state);
56 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
57 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
58 "during recovery\n"));
63 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
65 state->status = status;
66 state->errormsg = errormsg;
70 * If a node failed to complete the update_record control,
71 * then either a recovery is already running or something
72 * bad is going on. So trigger a recovery and let the
73 * recovery finish the transaction, sending back the reply
74 * for the trans3_commit control to the client.
76 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
81 if (state->num_pending == 0) {
82 enum ctdb_trans2_commit_error etype;
83 if (state->num_failed == state->num_sent) {
84 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
85 } else if (state->num_failed != 0) {
86 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
88 etype = CTDB_TRANS2_COMMIT_SUCCESS;
90 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
96 called if persistent store times out
98 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
99 struct timeval t, void *private_data)
101 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
103 if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
104 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
105 "timeout during recovery\n"));
109 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
110 "timeout in ctdb_persistent_state");
116 store a set of persistent records - called from a ctdb client when it has updated
117 some records in a persistent database. The client will have the record
118 locked for the duration of this call. The client is the dmaster when
121 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
122 struct ctdb_req_control *c,
123 TDB_DATA recdata, bool *async_reply)
125 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
126 struct ctdb_persistent_state *state;
128 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
129 struct ctdb_db_context *ctdb_db;
131 ctdb_db = find_ctdb_db(ctdb, m->db_id);
132 if (ctdb_db == NULL) {
133 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
134 "Unknown database db_id[0x%08x]\n", m->db_id));
138 if (client == NULL) {
139 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
143 if (ctdb_db->unhealthy_reason) {
144 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
145 ctdb_db->db_name, ctdb_db->unhealthy_reason));
149 /* handling num_persistent_updates is a bit strange -
151 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
152 They don't expect num_persistent_updates to be used at all
154 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
155 this commit to then decrement it
157 3) new clients which use TRANS2 commit functions, and
158 expect this function to increment the counter, and
159 then have it decremented in ctdb_control_trans2_error
160 or ctdb_control_trans2_finished
163 case CTDB_CONTROL_PERSISTENT_STORE:
164 if (ctdb_db->transaction_active) {
165 DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
166 "transaction is active on database "
167 "db_id[0x%08x] - refusing persistent "
168 " store for client id[0x%08x]\n",
169 ctdb_db->db_id, client->client_id));
172 if (client->num_persistent_updates > 0) {
173 client->num_persistent_updates--;
176 case CTDB_CONTROL_TRANS2_COMMIT:
177 if (ctdb_db->transaction_active) {
178 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
179 " already a transaction commit "
180 "active on db_id[0x%08x] - forbidding "
181 "client_id[0x%08x] to commit\n",
182 ctdb_db->db_id, client->client_id));
185 if (client->db_id != 0) {
186 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
187 "client-db_id[0x%08x] != 0 "
188 "(client_id[0x%08x])\n",
189 client->db_id, client->client_id));
192 client->num_persistent_updates++;
193 ctdb_db->transaction_active = true;
194 client->db_id = m->db_id;
195 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
196 " commit transaction on db id[0x%08x]\n",
197 client->client_id, client->db_id));
199 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
200 /* already updated from the first commit */
201 if (client->db_id != m->db_id) {
202 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
203 "retry: client-db_id[0x%08x] != "
204 "db_id[0x%08x] (client_id[0x%08x])\n",
206 m->db_id, client->client_id));
209 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
210 "transaction commit retry on "
212 client->client_id, client->db_id));
216 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
217 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
221 state = talloc_zero(ctdb, struct ctdb_persistent_state);
222 CTDB_NO_MEMORY(ctdb, state);
227 for (i=0;i<ctdb->vnn_map->size;i++) {
228 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
231 /* only send to active nodes */
232 if (node->flags & NODE_FLAGS_INACTIVE) {
236 /* don't send to ourselves */
237 if (node->pnn == ctdb->pnn) {
241 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
242 c->client_id, 0, recdata,
243 ctdb_persistent_callback, state);
245 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
250 state->num_pending++;
254 if (state->num_pending == 0) {
259 /* we need to wait for the replies */
262 /* need to keep the control structure around */
263 talloc_steal(state, c);
265 /* but we won't wait forever */
266 event_add_timed(ctdb->ev, state,
267 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
268 ctdb_persistent_store_timeout, state);
275 * Store a set of persistent records.
276 * This is used to roll out a transaction to all nodes.
278 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
279 struct ctdb_req_control *c,
280 TDB_DATA recdata, bool *async_reply)
282 struct ctdb_client *client;
283 struct ctdb_persistent_state *state;
285 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
286 struct ctdb_db_context *ctdb_db;
288 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
289 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
293 ctdb_db = find_ctdb_db(ctdb, m->db_id);
294 if (ctdb_db == NULL) {
295 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
296 "Unknown database db_id[0x%08x]\n", m->db_id));
300 client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
301 if (client == NULL) {
302 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
303 "to a client. Returning error\n"));
307 state = talloc_zero(ctdb, struct ctdb_persistent_state);
308 CTDB_NO_MEMORY(ctdb, state);
313 for (i = 0; i < ctdb->vnn_map->size; i++) {
314 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
317 /* only send to active nodes */
318 if (node->flags & NODE_FLAGS_INACTIVE) {
322 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
323 CTDB_CONTROL_UPDATE_RECORD,
324 c->client_id, 0, recdata,
325 ctdb_persistent_callback,
328 DEBUG(DEBUG_ERR,("Unable to send "
329 "CTDB_CONTROL_UPDATE_RECORD "
330 "to pnn %u\n", node->pnn));
335 state->num_pending++;
339 if (state->num_pending == 0) {
344 /* we need to wait for the replies */
347 /* need to keep the control structure around */
348 talloc_steal(state, c);
350 /* but we won't wait forever */
351 event_add_timed(ctdb->ev, state,
352 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
353 ctdb_persistent_store_timeout, state);
359 struct ctdb_persistent_write_state {
360 struct ctdb_db_context *ctdb_db;
361 struct ctdb_marshall_buffer *m;
362 struct ctdb_req_control *c;
367 called from a child process to write the data
369 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
372 struct ctdb_rec_data *rec = NULL;
373 struct ctdb_marshall_buffer *m = state->m;
375 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
377 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
378 state->ctdb_db->db_id));
382 for (i=0;i<m->count;i++) {
383 struct ctdb_ltdb_header oldheader;
384 struct ctdb_ltdb_header header;
385 TDB_DATA key, data, olddata;
386 TALLOC_CTX *tmp_ctx = talloc_new(state);
388 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
391 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
392 i, state->ctdb_db->db_id));
393 talloc_free(tmp_ctx);
397 /* fetch the old header and ensure the rsn is less than the new rsn */
398 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
400 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
401 state->ctdb_db->db_id));
402 talloc_free(tmp_ctx);
406 if (oldheader.rsn >= header.rsn &&
407 (olddata.dsize != data.dsize ||
408 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
409 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
410 state->ctdb_db->db_id,
411 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
412 talloc_free(tmp_ctx);
416 talloc_free(tmp_ctx);
418 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
420 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
421 state->ctdb_db->db_id));
426 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
428 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
429 state->ctdb_db->db_id));
436 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
442 called when we the child has completed the persistent write
445 static void ctdb_persistent_write_callback(int status, void *private_data)
447 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
448 struct ctdb_persistent_write_state);
451 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
457 called if our lockwait child times out
459 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
460 struct timeval t, void *private_data)
462 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
463 struct ctdb_persistent_write_state);
464 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
468 struct childwrite_handle {
469 struct ctdb_context *ctdb;
470 struct ctdb_db_context *ctdb_db;
471 struct fd_event *fde;
475 void (*callback)(int, void *);
476 struct timeval start_time;
479 static int childwrite_destructor(struct childwrite_handle *h)
481 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
482 kill(h->child, SIGKILL);
486 /* called when the child process has finished writing the record to the
489 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
490 uint16_t flags, void *private_data)
492 struct childwrite_handle *h = talloc_get_type(private_data,
493 struct childwrite_handle);
494 void *p = h->private_data;
495 void (*callback)(int, void *) = h->callback;
496 pid_t child = h->child;
497 TALLOC_CTX *tmp_ctx = talloc_new(ev);
501 CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
502 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
504 /* the handle needs to go away when the context is gone - when
505 the handle goes away this implicitly closes the pipe, which
507 talloc_steal(tmp_ctx, h);
509 talloc_set_destructor(h, NULL);
511 ret = read(h->fd[0], &c, 1);
513 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
519 kill(child, SIGKILL);
520 talloc_free(tmp_ctx);
523 /* this creates a child process which will take out a tdb transaction
524 and write the record to the database.
526 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
527 void (*callback)(int, void *private_data),
528 struct ctdb_persistent_write_state *state)
530 struct childwrite_handle *result;
532 pid_t parent = getpid();
534 CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
535 CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
537 if (!(result = talloc_zero(state, struct childwrite_handle))) {
538 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
542 ret = pipe(result->fd);
546 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
550 result->child = ctdb_fork(ctdb_db->ctdb);
552 if (result->child == (pid_t)-1) {
553 close(result->fd[0]);
554 close(result->fd[1]);
556 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
560 result->callback = callback;
561 result->private_data = state;
562 result->ctdb = ctdb_db->ctdb;
563 result->ctdb_db = ctdb_db;
565 if (result->child == 0) {
568 close(result->fd[0]);
569 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
570 ret = ctdb_persistent_store(state);
572 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
576 write(result->fd[1], &c, 1);
578 /* make sure we die when our parent dies */
579 while (kill(parent, 0) == 0 || errno != ESRCH) {
585 close(result->fd[1]);
586 set_close_on_exec(result->fd[0]);
588 talloc_set_destructor(result, childwrite_destructor);
590 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
592 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
593 EVENT_FD_READ, childwrite_handler,
595 if (result->fde == NULL) {
597 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
600 tevent_fd_set_auto_close(result->fde);
602 result->start_time = timeval_current();
608 update a record on this node if the new record has a higher rsn than the
611 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
612 struct ctdb_req_control *c, TDB_DATA recdata,
615 struct ctdb_db_context *ctdb_db;
616 struct ctdb_persistent_write_state *state;
617 struct childwrite_handle *handle;
618 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
620 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
621 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
625 ctdb_db = find_ctdb_db(ctdb, m->db_id);
626 if (ctdb_db == NULL) {
627 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
631 if (ctdb_db->unhealthy_reason) {
632 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
633 ctdb_db->db_name, ctdb_db->unhealthy_reason));
637 state = talloc(ctdb, struct ctdb_persistent_write_state);
638 CTDB_NO_MEMORY(ctdb, state);
640 state->ctdb_db = ctdb_db;
644 /* create a child process to take out a transaction and
647 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
648 if (handle == NULL) {
649 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
654 /* we need to wait for the replies */
657 /* need to keep the control structure around */
658 talloc_steal(state, c);
660 /* but we won't wait forever */
661 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
662 ctdb_persistent_lock_timeout, state);
669 called when a client has finished a local commit in a transaction to
670 a persistent database
672 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
673 struct ctdb_req_control *c)
675 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
676 struct ctdb_db_context *ctdb_db;
678 ctdb_db = find_ctdb_db(ctdb, client->db_id);
679 if (ctdb_db == NULL) {
680 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
681 "Unknown database 0x%08x\n", client->db_id));
684 if (!ctdb_db->transaction_active) {
685 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
686 "Database 0x%08x has no transaction commit "
687 "started\n", client->db_id));
691 ctdb_db->transaction_active = false;
694 if (client->num_persistent_updates == 0) {
695 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
696 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
697 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
700 client->num_persistent_updates--;
702 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
703 "transaction commit db_id[0x%08x]\n",
704 client->client_id, ctdb_db->db_id));
710 called when a client gets an error committing its database
711 during a transaction commit
713 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
714 struct ctdb_req_control *c)
716 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
717 struct ctdb_db_context *ctdb_db;
719 ctdb_db = find_ctdb_db(ctdb, client->db_id);
720 if (ctdb_db == NULL) {
721 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
722 "Unknown database 0x%08x\n", client->db_id));
725 if (!ctdb_db->transaction_active) {
726 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
727 "Database 0x%08x has no transaction commit "
728 "started\n", client->db_id));
732 ctdb_db->transaction_active = false;
735 if (client->num_persistent_updates == 0) {
736 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
738 client->num_persistent_updates--;
741 DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
742 " db_id[0x%08x] - forcing recovery\n",
744 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
750 * Tell whether a transaction is active on this node on the give DB.
752 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
753 struct ctdb_req_control *c,
756 struct ctdb_db_context *ctdb_db;
757 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
759 ctdb_db = find_ctdb_db(ctdb, db_id);
761 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
765 if (client->db_id == db_id) {
769 if (ctdb_db->transaction_active) {
777 backwards compatibility:
779 start a persistent store operation. passing both the key, header and
780 data to the daemon. If the client disconnects before it has issued
781 a persistent_update call to the daemon we trigger a full recovery
782 to ensure the databases are brought back in sync.
783 for now we ignore the recdata that the client has passed to us.
785 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
786 struct ctdb_req_control *c,
789 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
791 if (client == NULL) {
792 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
796 client->num_persistent_updates++;
802 backwards compatibility:
804 called to tell ctdbd that it is no longer doing a persistent update
806 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
807 struct ctdb_req_control *c,
810 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
812 if (client == NULL) {
813 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
817 if (client->num_persistent_updates > 0) {
818 client->num_persistent_updates--;
826 backwards compatibility:
828 single record varient of ctdb_control_trans2_commit for older clients
830 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
831 struct ctdb_req_control *c,
832 TDB_DATA recdata, bool *async_reply)
834 struct ctdb_marshall_buffer *m;
835 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
838 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
839 rec->keylen + rec->datalen) {
840 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
844 key.dptr = &rec->data[0];
845 key.dsize = rec->keylen;
846 data.dptr = &rec->data[rec->keylen];
847 data.dsize = rec->datalen;
849 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
850 CTDB_NO_MEMORY(ctdb, m);
852 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
855 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
860 struct ctdb_db_context *ctdb_db;
861 const char *keyname = CTDB_DB_SEQNUM_KEY;
864 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
866 ctdb_db = find_ctdb_db(ctdb, db_id);
868 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
873 key.dptr = (uint8_t *)discard_const(keyname);
874 key.dsize = strlen(keyname) + 1;
876 ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
881 if (data.dsize != sizeof(uint64_t)) {
886 *seqnum = *(uint64_t *)data.dptr;
889 talloc_free(mem_ctx);
894 * Get the sequence number of a persistent database.
896 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
904 db_id = *(uint32_t *)indata.dptr;
905 ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
910 outdata->dsize = sizeof(uint64_t);
911 outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
912 if (outdata->dptr == NULL) {
917 *(outdata->dptr) = seqnum;