4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
32 struct ctdb_req_control *c;
36 uint32_t num_failed, num_sent;
40 1) all nodes fail, and all nodes reply
41 2) some nodes fail, all nodes reply
47 called when a node has acknowledged a ctdb_control_update_record call
49 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
50 int32_t status, TDB_DATA data,
54 struct ctdb_persistent_state *state = talloc_get_type(private_data,
55 struct ctdb_persistent_state);
56 enum ctdb_trans2_commit_error etype;
58 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
59 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
60 "during recovery\n"));
65 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
66 status, errormsg?errormsg:"no error message given"));
67 state->status = status;
68 state->errormsg = errormsg;
72 * If a node failed to complete the update_record control,
73 * then either a recovery is already running or something
74 * bad is going on. So trigger a recovery and let the
75 * recovery finish the transaction, sending back the reply
76 * for the trans3_commit control to the client.
78 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
84 if (state->num_pending != 0) {
88 if (state->num_failed == state->num_sent) {
89 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
90 } else if (state->num_failed != 0) {
91 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
93 etype = CTDB_TRANS2_COMMIT_SUCCESS;
96 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
101 called if persistent store times out
103 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
104 struct timeval t, void *private_data)
106 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
108 if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
110 "timeout during recovery\n"));
114 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
115 "timeout in ctdb_persistent_state");
121 store a set of persistent records - called from a ctdb client when it has updated
122 some records in a persistent database. The client will have the record
123 locked for the duration of this call. The client is the dmaster when
126 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
127 struct ctdb_req_control *c,
128 TDB_DATA recdata, bool *async_reply)
130 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
131 struct ctdb_persistent_state *state;
133 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
134 struct ctdb_db_context *ctdb_db;
136 ctdb_db = find_ctdb_db(ctdb, m->db_id);
137 if (ctdb_db == NULL) {
138 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
139 "Unknown database db_id[0x%08x]\n", m->db_id));
143 if (client == NULL) {
144 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
148 if (ctdb_db->unhealthy_reason) {
149 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
150 ctdb_db->db_name, ctdb_db->unhealthy_reason));
154 /* handling num_persistent_updates is a bit strange -
156 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
157 They don't expect num_persistent_updates to be used at all
159 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
160 this commit to then decrement it
162 3) new clients which use TRANS2 commit functions, and
163 expect this function to increment the counter, and
164 then have it decremented in ctdb_control_trans2_error
165 or ctdb_control_trans2_finished
168 case CTDB_CONTROL_PERSISTENT_STORE:
169 if (ctdb_db->transaction_active) {
170 DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
171 "transaction is active on database "
172 "db_id[0x%08x] - refusing persistent "
173 " store for client id[0x%08x]\n",
174 ctdb_db->db_id, client->client_id));
177 if (client->num_persistent_updates > 0) {
178 client->num_persistent_updates--;
181 case CTDB_CONTROL_TRANS2_COMMIT:
182 if (ctdb_db->transaction_active) {
183 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
184 " already a transaction commit "
185 "active on db_id[0x%08x] - forbidding "
186 "client_id[0x%08x] to commit\n",
187 ctdb_db->db_id, client->client_id));
190 if (client->db_id != 0) {
191 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
192 "client-db_id[0x%08x] != 0 "
193 "(client_id[0x%08x])\n",
194 client->db_id, client->client_id));
197 client->num_persistent_updates++;
198 ctdb_db->transaction_active = true;
199 client->db_id = m->db_id;
200 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
201 " commit transaction on db id[0x%08x]\n",
202 client->client_id, client->db_id));
204 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
205 /* already updated from the first commit */
206 if (client->db_id != m->db_id) {
207 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
208 "retry: client-db_id[0x%08x] != "
209 "db_id[0x%08x] (client_id[0x%08x])\n",
211 m->db_id, client->client_id));
214 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
215 "transaction commit retry on "
217 client->client_id, client->db_id));
221 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
222 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
226 state = talloc_zero(ctdb, struct ctdb_persistent_state);
227 CTDB_NO_MEMORY(ctdb, state);
232 for (i=0;i<ctdb->vnn_map->size;i++) {
233 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
236 /* only send to active nodes */
237 if (node->flags & NODE_FLAGS_INACTIVE) {
241 /* don't send to ourselves */
242 if (node->pnn == ctdb->pnn) {
246 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
247 c->client_id, 0, recdata,
248 ctdb_persistent_callback, state);
250 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
255 state->num_pending++;
259 if (state->num_pending == 0) {
264 /* we need to wait for the replies */
267 /* need to keep the control structure around */
268 talloc_steal(state, c);
270 /* but we won't wait forever */
271 event_add_timed(ctdb->ev, state,
272 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
273 ctdb_persistent_store_timeout, state);
278 static int ctdb_persistent_state_destructor(struct ctdb_persistent_state *state)
280 if (state->ctdb_db != NULL) {
281 state->ctdb_db->persistent_state = NULL;
288 * Store a set of persistent records.
289 * This is used to roll out a transaction to all nodes.
291 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
292 struct ctdb_req_control *c,
293 TDB_DATA recdata, bool *async_reply)
295 struct ctdb_client *client;
296 struct ctdb_persistent_state *state;
298 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
299 struct ctdb_db_context *ctdb_db;
301 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
302 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
306 ctdb_db = find_ctdb_db(ctdb, m->db_id);
307 if (ctdb_db == NULL) {
308 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
309 "Unknown database db_id[0x%08x]\n", m->db_id));
313 client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
314 if (client == NULL) {
315 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
316 "to a client. Returning error\n"));
320 ctdb_db->persistent_state = talloc_zero(ctdb_db,
321 struct ctdb_persistent_state);
322 CTDB_NO_MEMORY(ctdb, ctdb_db->persistent_state);
324 state = ctdb_db->persistent_state;
326 state->ctdb_db = ctdb_db;
328 talloc_set_destructor(state, ctdb_persistent_state_destructor);
330 for (i = 0; i < ctdb->vnn_map->size; i++) {
331 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
334 /* only send to active nodes */
335 if (node->flags & NODE_FLAGS_INACTIVE) {
339 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
340 CTDB_CONTROL_UPDATE_RECORD,
341 c->client_id, 0, recdata,
342 ctdb_persistent_callback,
345 DEBUG(DEBUG_ERR,("Unable to send "
346 "CTDB_CONTROL_UPDATE_RECORD "
347 "to pnn %u\n", node->pnn));
352 state->num_pending++;
356 if (state->num_pending == 0) {
361 /* we need to wait for the replies */
364 /* need to keep the control structure around */
365 talloc_steal(state, c);
367 /* but we won't wait forever */
368 event_add_timed(ctdb->ev, state,
369 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
370 ctdb_persistent_store_timeout, state);
376 struct ctdb_persistent_write_state {
377 struct ctdb_db_context *ctdb_db;
378 struct ctdb_marshall_buffer *m;
379 struct ctdb_req_control *c;
384 called from a child process to write the data
386 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
389 struct ctdb_rec_data *rec = NULL;
390 struct ctdb_marshall_buffer *m = state->m;
392 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
394 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
395 state->ctdb_db->db_id));
399 for (i=0;i<m->count;i++) {
400 struct ctdb_ltdb_header oldheader;
401 struct ctdb_ltdb_header header;
402 TDB_DATA key, data, olddata;
403 TALLOC_CTX *tmp_ctx = talloc_new(state);
405 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
408 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
409 i, state->ctdb_db->db_id));
410 talloc_free(tmp_ctx);
414 /* fetch the old header and ensure the rsn is less than the new rsn */
415 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
417 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
418 state->ctdb_db->db_id));
419 talloc_free(tmp_ctx);
423 if (oldheader.rsn >= header.rsn &&
424 (olddata.dsize != data.dsize ||
425 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
426 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
427 state->ctdb_db->db_id,
428 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
429 talloc_free(tmp_ctx);
433 talloc_free(tmp_ctx);
435 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
437 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
438 state->ctdb_db->db_id));
443 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
445 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
446 state->ctdb_db->db_id));
453 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
459 called when we the child has completed the persistent write
462 static void ctdb_persistent_write_callback(int status, void *private_data)
464 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
465 struct ctdb_persistent_write_state);
468 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
474 called if our lockwait child times out
476 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
477 struct timeval t, void *private_data)
479 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
480 struct ctdb_persistent_write_state);
481 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
485 struct childwrite_handle {
486 struct ctdb_context *ctdb;
487 struct ctdb_db_context *ctdb_db;
488 struct fd_event *fde;
492 void (*callback)(int, void *);
493 struct timeval start_time;
496 static int childwrite_destructor(struct childwrite_handle *h)
498 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
499 kill(h->child, SIGKILL);
503 /* called when the child process has finished writing the record to the
506 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
507 uint16_t flags, void *private_data)
509 struct childwrite_handle *h = talloc_get_type(private_data,
510 struct childwrite_handle);
511 void *p = h->private_data;
512 void (*callback)(int, void *) = h->callback;
513 pid_t child = h->child;
514 TALLOC_CTX *tmp_ctx = talloc_new(ev);
518 CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
519 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
521 /* the handle needs to go away when the context is gone - when
522 the handle goes away this implicitly closes the pipe, which
524 talloc_steal(tmp_ctx, h);
526 talloc_set_destructor(h, NULL);
528 ret = read(h->fd[0], &c, 1);
530 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
536 kill(child, SIGKILL);
537 talloc_free(tmp_ctx);
540 /* this creates a child process which will take out a tdb transaction
541 and write the record to the database.
543 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
544 void (*callback)(int, void *private_data),
545 struct ctdb_persistent_write_state *state)
547 struct childwrite_handle *result;
549 pid_t parent = getpid();
551 CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
552 CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
554 if (!(result = talloc_zero(state, struct childwrite_handle))) {
555 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
559 ret = pipe(result->fd);
563 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
567 result->child = ctdb_fork(ctdb_db->ctdb);
569 if (result->child == (pid_t)-1) {
570 close(result->fd[0]);
571 close(result->fd[1]);
573 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
577 result->callback = callback;
578 result->private_data = state;
579 result->ctdb = ctdb_db->ctdb;
580 result->ctdb_db = ctdb_db;
582 if (result->child == 0) {
585 close(result->fd[0]);
586 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
587 ret = ctdb_persistent_store(state);
589 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
593 write(result->fd[1], &c, 1);
595 /* make sure we die when our parent dies */
596 while (kill(parent, 0) == 0 || errno != ESRCH) {
602 close(result->fd[1]);
603 set_close_on_exec(result->fd[0]);
605 talloc_set_destructor(result, childwrite_destructor);
607 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
609 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
610 EVENT_FD_READ, childwrite_handler,
612 if (result->fde == NULL) {
614 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
617 tevent_fd_set_auto_close(result->fde);
619 result->start_time = timeval_current();
625 update a record on this node if the new record has a higher rsn than the
628 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
629 struct ctdb_req_control *c, TDB_DATA recdata,
632 struct ctdb_db_context *ctdb_db;
633 struct ctdb_persistent_write_state *state;
634 struct childwrite_handle *handle;
635 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
637 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
638 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
642 ctdb_db = find_ctdb_db(ctdb, m->db_id);
643 if (ctdb_db == NULL) {
644 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
648 if (ctdb_db->unhealthy_reason) {
649 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
650 ctdb_db->db_name, ctdb_db->unhealthy_reason));
654 state = talloc(ctdb, struct ctdb_persistent_write_state);
655 CTDB_NO_MEMORY(ctdb, state);
657 state->ctdb_db = ctdb_db;
661 /* create a child process to take out a transaction and
664 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
665 if (handle == NULL) {
666 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
671 /* we need to wait for the replies */
674 /* need to keep the control structure around */
675 talloc_steal(state, c);
677 /* but we won't wait forever */
678 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
679 ctdb_persistent_lock_timeout, state);
686 called when a client has finished a local commit in a transaction to
687 a persistent database
689 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
690 struct ctdb_req_control *c)
692 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
693 struct ctdb_db_context *ctdb_db;
695 ctdb_db = find_ctdb_db(ctdb, client->db_id);
696 if (ctdb_db == NULL) {
697 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
698 "Unknown database 0x%08x\n", client->db_id));
701 if (!ctdb_db->transaction_active) {
702 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
703 "Database 0x%08x has no transaction commit "
704 "started\n", client->db_id));
708 ctdb_db->transaction_active = false;
711 if (client->num_persistent_updates == 0) {
712 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
713 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
714 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
717 client->num_persistent_updates--;
719 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
720 "transaction commit db_id[0x%08x]\n",
721 client->client_id, ctdb_db->db_id));
727 called when a client gets an error committing its database
728 during a transaction commit
730 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
731 struct ctdb_req_control *c)
733 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
734 struct ctdb_db_context *ctdb_db;
736 ctdb_db = find_ctdb_db(ctdb, client->db_id);
737 if (ctdb_db == NULL) {
738 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
739 "Unknown database 0x%08x\n", client->db_id));
742 if (!ctdb_db->transaction_active) {
743 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
744 "Database 0x%08x has no transaction commit "
745 "started\n", client->db_id));
749 ctdb_db->transaction_active = false;
752 if (client->num_persistent_updates == 0) {
753 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
755 client->num_persistent_updates--;
758 DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
759 " db_id[0x%08x] - forcing recovery\n",
761 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
767 * Tell whether a transaction is active on this node on the give DB.
769 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
770 struct ctdb_req_control *c,
773 struct ctdb_db_context *ctdb_db;
774 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
776 ctdb_db = find_ctdb_db(ctdb, db_id);
778 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
782 if (client->db_id == db_id) {
786 if (ctdb_db->transaction_active) {
794 backwards compatibility:
796 start a persistent store operation. passing both the key, header and
797 data to the daemon. If the client disconnects before it has issued
798 a persistent_update call to the daemon we trigger a full recovery
799 to ensure the databases are brought back in sync.
800 for now we ignore the recdata that the client has passed to us.
802 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
803 struct ctdb_req_control *c,
806 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
808 if (client == NULL) {
809 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
813 client->num_persistent_updates++;
819 backwards compatibility:
821 called to tell ctdbd that it is no longer doing a persistent update
823 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
824 struct ctdb_req_control *c,
827 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
829 if (client == NULL) {
830 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
834 if (client->num_persistent_updates > 0) {
835 client->num_persistent_updates--;
843 backwards compatibility:
845 single record varient of ctdb_control_trans2_commit for older clients
847 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
848 struct ctdb_req_control *c,
849 TDB_DATA recdata, bool *async_reply)
851 struct ctdb_marshall_buffer *m;
852 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
855 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
856 rec->keylen + rec->datalen) {
857 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
861 key.dptr = &rec->data[0];
862 key.dsize = rec->keylen;
863 data.dptr = &rec->data[rec->keylen];
864 data.dsize = rec->datalen;
866 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
867 CTDB_NO_MEMORY(ctdb, m);
869 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
872 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
877 struct ctdb_db_context *ctdb_db;
878 const char *keyname = CTDB_DB_SEQNUM_KEY;
881 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
883 ctdb_db = find_ctdb_db(ctdb, db_id);
885 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
890 key.dptr = (uint8_t *)discard_const(keyname);
891 key.dsize = strlen(keyname) + 1;
893 ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
898 if (data.dsize != sizeof(uint64_t)) {
903 *seqnum = *(uint64_t *)data.dptr;
906 talloc_free(mem_ctx);
911 * Get the sequence number of a persistent database.
913 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
921 db_id = *(uint32_t *)indata.dptr;
922 ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
927 outdata->dsize = sizeof(uint64_t);
928 outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
929 if (outdata->dptr == NULL) {
934 *(outdata->dptr) = seqnum;