4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
32 struct ctdb_req_control *c;
36 uint32_t num_failed, num_sent;
40 1) all nodes fail, and all nodes reply
41 2) some nodes fail, all nodes reply
47 called when a node has acknowledged a ctdb_control_update_record call
49 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
50 int32_t status, TDB_DATA data,
54 struct ctdb_persistent_state *state = talloc_get_type(private_data,
55 struct ctdb_persistent_state);
56 enum ctdb_trans2_commit_error etype;
58 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
59 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
60 "during recovery\n"));
65 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
66 status, errormsg?errormsg:"no error message given"));
67 state->status = status;
68 state->errormsg = errormsg;
72 * If a node failed to complete the update_record control,
73 * then either a recovery is already running or something
74 * bad is going on. So trigger a recovery and let the
75 * recovery finish the transaction, sending back the reply
76 * for the trans3_commit control to the client.
78 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
84 if (state->num_pending != 0) {
88 if (state->num_failed == state->num_sent) {
89 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
90 } else if (state->num_failed != 0) {
91 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
93 etype = CTDB_TRANS2_COMMIT_SUCCESS;
96 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
101 called if persistent store times out
103 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
104 struct timeval t, void *private_data)
106 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
108 if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
110 "timeout during recovery\n"));
114 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
115 "timeout in ctdb_persistent_state");
121 store a set of persistent records - called from a ctdb client when it has updated
122 some records in a persistent database. The client will have the record
123 locked for the duration of this call. The client is the dmaster when
126 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
127 struct ctdb_req_control *c,
128 TDB_DATA recdata, bool *async_reply)
130 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
131 struct ctdb_persistent_state *state;
133 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
134 struct ctdb_db_context *ctdb_db;
136 ctdb_db = find_ctdb_db(ctdb, m->db_id);
137 if (ctdb_db == NULL) {
138 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
139 "Unknown database db_id[0x%08x]\n", m->db_id));
143 if (client == NULL) {
144 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
148 if (ctdb_db->unhealthy_reason) {
149 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
150 ctdb_db->db_name, ctdb_db->unhealthy_reason));
154 /* handling num_persistent_updates is a bit strange -
156 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
157 They don't expect num_persistent_updates to be used at all
159 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
160 this commit to then decrement it
162 3) new clients which use TRANS2 commit functions, and
163 expect this function to increment the counter, and
164 then have it decremented in ctdb_control_trans2_error
165 or ctdb_control_trans2_finished
168 case CTDB_CONTROL_PERSISTENT_STORE:
169 if (ctdb_db->transaction_active) {
170 DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
171 "transaction is active on database "
172 "db_id[0x%08x] - refusing persistent "
173 " store for client id[0x%08x]\n",
174 ctdb_db->db_id, client->client_id));
177 if (client->num_persistent_updates > 0) {
178 client->num_persistent_updates--;
181 case CTDB_CONTROL_TRANS2_COMMIT:
182 if (ctdb_db->transaction_active) {
183 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
184 " already a transaction commit "
185 "active on db_id[0x%08x] - forbidding "
186 "client_id[0x%08x] to commit\n",
187 ctdb_db->db_id, client->client_id));
190 if (client->db_id != 0) {
191 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
192 "client-db_id[0x%08x] != 0 "
193 "(client_id[0x%08x])\n",
194 client->db_id, client->client_id));
197 client->num_persistent_updates++;
198 ctdb_db->transaction_active = true;
199 client->db_id = m->db_id;
200 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
201 " commit transaction on db id[0x%08x]\n",
202 client->client_id, client->db_id));
204 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
205 /* already updated from the first commit */
206 if (client->db_id != m->db_id) {
207 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
208 "retry: client-db_id[0x%08x] != "
209 "db_id[0x%08x] (client_id[0x%08x])\n",
211 m->db_id, client->client_id));
214 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
215 "transaction commit retry on "
217 client->client_id, client->db_id));
221 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
222 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
226 state = talloc_zero(ctdb, struct ctdb_persistent_state);
227 CTDB_NO_MEMORY(ctdb, state);
232 for (i=0;i<ctdb->vnn_map->size;i++) {
233 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
236 /* only send to active nodes */
237 if (node->flags & NODE_FLAGS_INACTIVE) {
241 /* don't send to ourselves */
242 if (node->pnn == ctdb->pnn) {
246 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
247 c->client_id, 0, recdata,
248 ctdb_persistent_callback, state);
250 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
255 state->num_pending++;
259 if (state->num_pending == 0) {
264 /* we need to wait for the replies */
267 /* need to keep the control structure around */
268 talloc_steal(state, c);
270 /* but we won't wait forever */
271 event_add_timed(ctdb->ev, state,
272 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
273 ctdb_persistent_store_timeout, state);
280 * Store a set of persistent records.
281 * This is used to roll out a transaction to all nodes.
283 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
284 struct ctdb_req_control *c,
285 TDB_DATA recdata, bool *async_reply)
287 struct ctdb_client *client;
288 struct ctdb_persistent_state *state;
290 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
291 struct ctdb_db_context *ctdb_db;
293 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
294 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
298 ctdb_db = find_ctdb_db(ctdb, m->db_id);
299 if (ctdb_db == NULL) {
300 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
301 "Unknown database db_id[0x%08x]\n", m->db_id));
305 client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
306 if (client == NULL) {
307 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
308 "to a client. Returning error\n"));
312 state = talloc_zero(ctdb, struct ctdb_persistent_state);
313 CTDB_NO_MEMORY(ctdb, state);
318 for (i = 0; i < ctdb->vnn_map->size; i++) {
319 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
322 /* only send to active nodes */
323 if (node->flags & NODE_FLAGS_INACTIVE) {
327 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
328 CTDB_CONTROL_UPDATE_RECORD,
329 c->client_id, 0, recdata,
330 ctdb_persistent_callback,
333 DEBUG(DEBUG_ERR,("Unable to send "
334 "CTDB_CONTROL_UPDATE_RECORD "
335 "to pnn %u\n", node->pnn));
340 state->num_pending++;
344 if (state->num_pending == 0) {
349 /* we need to wait for the replies */
352 /* need to keep the control structure around */
353 talloc_steal(state, c);
355 /* but we won't wait forever */
356 event_add_timed(ctdb->ev, state,
357 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
358 ctdb_persistent_store_timeout, state);
364 struct ctdb_persistent_write_state {
365 struct ctdb_db_context *ctdb_db;
366 struct ctdb_marshall_buffer *m;
367 struct ctdb_req_control *c;
372 called from a child process to write the data
374 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
377 struct ctdb_rec_data *rec = NULL;
378 struct ctdb_marshall_buffer *m = state->m;
380 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
382 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
383 state->ctdb_db->db_id));
387 for (i=0;i<m->count;i++) {
388 struct ctdb_ltdb_header oldheader;
389 struct ctdb_ltdb_header header;
390 TDB_DATA key, data, olddata;
391 TALLOC_CTX *tmp_ctx = talloc_new(state);
393 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
396 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
397 i, state->ctdb_db->db_id));
398 talloc_free(tmp_ctx);
402 /* fetch the old header and ensure the rsn is less than the new rsn */
403 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
405 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
406 state->ctdb_db->db_id));
407 talloc_free(tmp_ctx);
411 if (oldheader.rsn >= header.rsn &&
412 (olddata.dsize != data.dsize ||
413 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
414 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
415 state->ctdb_db->db_id,
416 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
417 talloc_free(tmp_ctx);
421 talloc_free(tmp_ctx);
423 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
425 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
426 state->ctdb_db->db_id));
431 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
433 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
434 state->ctdb_db->db_id));
441 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
447 called when we the child has completed the persistent write
450 static void ctdb_persistent_write_callback(int status, void *private_data)
452 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
453 struct ctdb_persistent_write_state);
456 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
462 called if our lockwait child times out
464 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
465 struct timeval t, void *private_data)
467 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
468 struct ctdb_persistent_write_state);
469 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
473 struct childwrite_handle {
474 struct ctdb_context *ctdb;
475 struct ctdb_db_context *ctdb_db;
476 struct fd_event *fde;
480 void (*callback)(int, void *);
481 struct timeval start_time;
484 static int childwrite_destructor(struct childwrite_handle *h)
486 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
487 kill(h->child, SIGKILL);
491 /* called when the child process has finished writing the record to the
494 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
495 uint16_t flags, void *private_data)
497 struct childwrite_handle *h = talloc_get_type(private_data,
498 struct childwrite_handle);
499 void *p = h->private_data;
500 void (*callback)(int, void *) = h->callback;
501 pid_t child = h->child;
502 TALLOC_CTX *tmp_ctx = talloc_new(ev);
506 CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
507 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
509 /* the handle needs to go away when the context is gone - when
510 the handle goes away this implicitly closes the pipe, which
512 talloc_steal(tmp_ctx, h);
514 talloc_set_destructor(h, NULL);
516 ret = read(h->fd[0], &c, 1);
518 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
524 kill(child, SIGKILL);
525 talloc_free(tmp_ctx);
528 /* this creates a child process which will take out a tdb transaction
529 and write the record to the database.
531 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
532 void (*callback)(int, void *private_data),
533 struct ctdb_persistent_write_state *state)
535 struct childwrite_handle *result;
537 pid_t parent = getpid();
539 CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
540 CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
542 if (!(result = talloc_zero(state, struct childwrite_handle))) {
543 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
547 ret = pipe(result->fd);
551 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
555 result->child = ctdb_fork(ctdb_db->ctdb);
557 if (result->child == (pid_t)-1) {
558 close(result->fd[0]);
559 close(result->fd[1]);
561 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
565 result->callback = callback;
566 result->private_data = state;
567 result->ctdb = ctdb_db->ctdb;
568 result->ctdb_db = ctdb_db;
570 if (result->child == 0) {
573 close(result->fd[0]);
574 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
575 ret = ctdb_persistent_store(state);
577 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
581 write(result->fd[1], &c, 1);
583 /* make sure we die when our parent dies */
584 while (kill(parent, 0) == 0 || errno != ESRCH) {
590 close(result->fd[1]);
591 set_close_on_exec(result->fd[0]);
593 talloc_set_destructor(result, childwrite_destructor);
595 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
597 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
598 EVENT_FD_READ, childwrite_handler,
600 if (result->fde == NULL) {
602 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
605 tevent_fd_set_auto_close(result->fde);
607 result->start_time = timeval_current();
613 update a record on this node if the new record has a higher rsn than the
616 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
617 struct ctdb_req_control *c, TDB_DATA recdata,
620 struct ctdb_db_context *ctdb_db;
621 struct ctdb_persistent_write_state *state;
622 struct childwrite_handle *handle;
623 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
625 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
626 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
630 ctdb_db = find_ctdb_db(ctdb, m->db_id);
631 if (ctdb_db == NULL) {
632 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
636 if (ctdb_db->unhealthy_reason) {
637 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
638 ctdb_db->db_name, ctdb_db->unhealthy_reason));
642 state = talloc(ctdb, struct ctdb_persistent_write_state);
643 CTDB_NO_MEMORY(ctdb, state);
645 state->ctdb_db = ctdb_db;
649 /* create a child process to take out a transaction and
652 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
653 if (handle == NULL) {
654 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
659 /* we need to wait for the replies */
662 /* need to keep the control structure around */
663 talloc_steal(state, c);
665 /* but we won't wait forever */
666 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
667 ctdb_persistent_lock_timeout, state);
674 called when a client has finished a local commit in a transaction to
675 a persistent database
677 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
678 struct ctdb_req_control *c)
680 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
681 struct ctdb_db_context *ctdb_db;
683 ctdb_db = find_ctdb_db(ctdb, client->db_id);
684 if (ctdb_db == NULL) {
685 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
686 "Unknown database 0x%08x\n", client->db_id));
689 if (!ctdb_db->transaction_active) {
690 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
691 "Database 0x%08x has no transaction commit "
692 "started\n", client->db_id));
696 ctdb_db->transaction_active = false;
699 if (client->num_persistent_updates == 0) {
700 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
701 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
702 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
705 client->num_persistent_updates--;
707 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
708 "transaction commit db_id[0x%08x]\n",
709 client->client_id, ctdb_db->db_id));
715 called when a client gets an error committing its database
716 during a transaction commit
718 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
719 struct ctdb_req_control *c)
721 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
722 struct ctdb_db_context *ctdb_db;
724 ctdb_db = find_ctdb_db(ctdb, client->db_id);
725 if (ctdb_db == NULL) {
726 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
727 "Unknown database 0x%08x\n", client->db_id));
730 if (!ctdb_db->transaction_active) {
731 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
732 "Database 0x%08x has no transaction commit "
733 "started\n", client->db_id));
737 ctdb_db->transaction_active = false;
740 if (client->num_persistent_updates == 0) {
741 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
743 client->num_persistent_updates--;
746 DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
747 " db_id[0x%08x] - forcing recovery\n",
749 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
755 * Tell whether a transaction is active on this node on the give DB.
757 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
758 struct ctdb_req_control *c,
761 struct ctdb_db_context *ctdb_db;
762 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
764 ctdb_db = find_ctdb_db(ctdb, db_id);
766 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
770 if (client->db_id == db_id) {
774 if (ctdb_db->transaction_active) {
782 backwards compatibility:
784 start a persistent store operation. passing both the key, header and
785 data to the daemon. If the client disconnects before it has issued
786 a persistent_update call to the daemon we trigger a full recovery
787 to ensure the databases are brought back in sync.
788 for now we ignore the recdata that the client has passed to us.
790 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
791 struct ctdb_req_control *c,
794 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
796 if (client == NULL) {
797 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
801 client->num_persistent_updates++;
807 backwards compatibility:
809 called to tell ctdbd that it is no longer doing a persistent update
811 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
812 struct ctdb_req_control *c,
815 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
817 if (client == NULL) {
818 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
822 if (client->num_persistent_updates > 0) {
823 client->num_persistent_updates--;
831 backwards compatibility:
833 single record varient of ctdb_control_trans2_commit for older clients
835 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
836 struct ctdb_req_control *c,
837 TDB_DATA recdata, bool *async_reply)
839 struct ctdb_marshall_buffer *m;
840 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
843 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
844 rec->keylen + rec->datalen) {
845 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
849 key.dptr = &rec->data[0];
850 key.dsize = rec->keylen;
851 data.dptr = &rec->data[rec->keylen];
852 data.dsize = rec->datalen;
854 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
855 CTDB_NO_MEMORY(ctdb, m);
857 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
860 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
865 struct ctdb_db_context *ctdb_db;
866 const char *keyname = CTDB_DB_SEQNUM_KEY;
869 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
871 ctdb_db = find_ctdb_db(ctdb, db_id);
873 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
878 key.dptr = (uint8_t *)discard_const(keyname);
879 key.dsize = strlen(keyname) + 1;
881 ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
886 if (data.dsize != sizeof(uint64_t)) {
891 *seqnum = *(uint64_t *)data.dptr;
894 talloc_free(mem_ctx);
899 * Get the sequence number of a persistent database.
901 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
909 db_id = *(uint32_t *)indata.dptr;
910 ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
915 outdata->dsize = sizeof(uint64_t);
916 outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
917 if (outdata->dptr == NULL) {
922 *(outdata->dptr) = seqnum;