4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
32 struct ctdb_req_control *c;
36 uint32_t num_failed, num_sent;
40 1) all nodes fail, and all nodes reply
41 2) some nodes fail, all nodes reply
47 called when a node has acknowledged a ctdb_control_update_record call
49 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
50 int32_t status, TDB_DATA data,
54 struct ctdb_persistent_state *state = talloc_get_type(private_data,
55 struct ctdb_persistent_state);
56 enum ctdb_trans2_commit_error etype;
58 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
59 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
60 "during recovery\n"));
65 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
66 status, errormsg?errormsg:"no error message given"));
67 state->status = status;
68 state->errormsg = errormsg;
72 * If a node failed to complete the update_record control,
73 * then either a recovery is already running or something
74 * bad is going on. So trigger a recovery and let the
75 * recovery finish the transaction, sending back the reply
76 * for the trans3_commit control to the client.
78 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
84 if (state->num_pending != 0) {
88 if (state->num_failed == state->num_sent) {
89 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
90 } else if (state->num_failed != 0) {
91 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
93 etype = CTDB_TRANS2_COMMIT_SUCCESS;
96 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
101 called if persistent store times out
103 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
104 struct timeval t, void *private_data)
106 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
108 if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
110 "timeout during recovery\n"));
114 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
115 "timeout in ctdb_persistent_state");
121 store a set of persistent records - called from a ctdb client when it has updated
122 some records in a persistent database. The client will have the record
123 locked for the duration of this call. The client is the dmaster when
126 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
127 struct ctdb_req_control *c,
128 TDB_DATA recdata, bool *async_reply)
130 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
131 struct ctdb_persistent_state *state;
133 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
134 struct ctdb_db_context *ctdb_db;
136 ctdb_db = find_ctdb_db(ctdb, m->db_id);
137 if (ctdb_db == NULL) {
138 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
139 "Unknown database db_id[0x%08x]\n", m->db_id));
143 if (client == NULL) {
144 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
148 if (ctdb_db->unhealthy_reason) {
149 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
150 ctdb_db->db_name, ctdb_db->unhealthy_reason));
154 /* handling num_persistent_updates is a bit strange -
156 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
157 They don't expect num_persistent_updates to be used at all
159 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
160 this commit to then decrement it
162 3) new clients which use TRANS2 commit functions, and
163 expect this function to increment the counter, and
164 then have it decremented in ctdb_control_trans2_error
165 or ctdb_control_trans2_finished
168 case CTDB_CONTROL_PERSISTENT_STORE:
169 if (ctdb_db->transaction_active) {
170 DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
171 "transaction is active on database "
172 "db_id[0x%08x] - refusing persistent "
173 " store for client id[0x%08x]\n",
174 ctdb_db->db_id, client->client_id));
177 if (client->num_persistent_updates > 0) {
178 client->num_persistent_updates--;
181 case CTDB_CONTROL_TRANS2_COMMIT:
182 if (ctdb_db->transaction_active) {
183 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
184 " already a transaction commit "
185 "active on db_id[0x%08x] - forbidding "
186 "client_id[0x%08x] to commit\n",
187 ctdb_db->db_id, client->client_id));
190 if (client->db_id != 0) {
191 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
192 "client-db_id[0x%08x] != 0 "
193 "(client_id[0x%08x])\n",
194 client->db_id, client->client_id));
197 client->num_persistent_updates++;
198 ctdb_db->transaction_active = true;
199 client->db_id = m->db_id;
200 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
201 " commit transaction on db id[0x%08x]\n",
202 client->client_id, client->db_id));
204 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
205 /* already updated from the first commit */
206 if (client->db_id != m->db_id) {
207 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
208 "retry: client-db_id[0x%08x] != "
209 "db_id[0x%08x] (client_id[0x%08x])\n",
211 m->db_id, client->client_id));
214 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
215 "transaction commit retry on "
217 client->client_id, client->db_id));
221 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
222 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
226 state = talloc_zero(ctdb, struct ctdb_persistent_state);
227 CTDB_NO_MEMORY(ctdb, state);
232 for (i=0;i<ctdb->vnn_map->size;i++) {
233 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
236 /* only send to active nodes */
237 if (node->flags & NODE_FLAGS_INACTIVE) {
241 /* don't send to ourselves */
242 if (node->pnn == ctdb->pnn) {
246 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
247 c->client_id, 0, recdata,
248 ctdb_persistent_callback, state);
250 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
255 state->num_pending++;
259 if (state->num_pending == 0) {
264 /* we need to wait for the replies */
267 /* need to keep the control structure around */
268 talloc_steal(state, c);
270 /* but we won't wait forever */
271 event_add_timed(ctdb->ev, state,
272 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
273 ctdb_persistent_store_timeout, state);
278 static int ctdb_persistent_state_destructor(struct ctdb_persistent_state *state)
280 if (state->ctdb_db != NULL) {
281 state->ctdb_db->persistent_state = NULL;
288 * Store a set of persistent records.
289 * This is used to roll out a transaction to all nodes.
291 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
292 struct ctdb_req_control *c,
293 TDB_DATA recdata, bool *async_reply)
295 struct ctdb_client *client;
296 struct ctdb_persistent_state *state;
298 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
299 struct ctdb_db_context *ctdb_db;
301 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
302 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
306 ctdb_db = find_ctdb_db(ctdb, m->db_id);
307 if (ctdb_db == NULL) {
308 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
309 "Unknown database db_id[0x%08x]\n", m->db_id));
313 if (ctdb_db->persistent_state != NULL) {
314 DEBUG(DEBUG_ERR, (__location__ " Error: "
315 "ctdb_control_trans3_commit "
316 "called while a transaction commit is "
317 "active. db_id[0x%08x]\n", m->db_id));
321 client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
322 if (client == NULL) {
323 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
324 "to a client. Returning error\n"));
328 ctdb_db->persistent_state = talloc_zero(ctdb_db,
329 struct ctdb_persistent_state);
330 CTDB_NO_MEMORY(ctdb, ctdb_db->persistent_state);
332 state = ctdb_db->persistent_state;
334 state->ctdb_db = ctdb_db;
336 talloc_set_destructor(state, ctdb_persistent_state_destructor);
338 for (i = 0; i < ctdb->vnn_map->size; i++) {
339 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
342 /* only send to active nodes */
343 if (node->flags & NODE_FLAGS_INACTIVE) {
347 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
348 CTDB_CONTROL_UPDATE_RECORD,
349 c->client_id, 0, recdata,
350 ctdb_persistent_callback,
353 DEBUG(DEBUG_ERR,("Unable to send "
354 "CTDB_CONTROL_UPDATE_RECORD "
355 "to pnn %u\n", node->pnn));
360 state->num_pending++;
364 if (state->num_pending == 0) {
369 /* we need to wait for the replies */
372 /* need to keep the control structure around */
373 talloc_steal(state, c);
375 /* but we won't wait forever */
376 event_add_timed(ctdb->ev, state,
377 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
378 ctdb_persistent_store_timeout, state);
384 struct ctdb_persistent_write_state {
385 struct ctdb_db_context *ctdb_db;
386 struct ctdb_marshall_buffer *m;
387 struct ctdb_req_control *c;
392 called from a child process to write the data
394 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
397 struct ctdb_rec_data *rec = NULL;
398 struct ctdb_marshall_buffer *m = state->m;
400 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
402 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
403 state->ctdb_db->db_id));
407 for (i=0;i<m->count;i++) {
408 struct ctdb_ltdb_header oldheader;
409 struct ctdb_ltdb_header header;
410 TDB_DATA key, data, olddata;
411 TALLOC_CTX *tmp_ctx = talloc_new(state);
413 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
416 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
417 i, state->ctdb_db->db_id));
418 talloc_free(tmp_ctx);
422 /* fetch the old header and ensure the rsn is less than the new rsn */
423 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
425 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
426 state->ctdb_db->db_id));
427 talloc_free(tmp_ctx);
431 if (oldheader.rsn >= header.rsn &&
432 (olddata.dsize != data.dsize ||
433 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
434 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
435 state->ctdb_db->db_id,
436 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
437 talloc_free(tmp_ctx);
441 talloc_free(tmp_ctx);
443 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
445 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
446 state->ctdb_db->db_id));
451 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
453 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
454 state->ctdb_db->db_id));
461 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
467 called when we the child has completed the persistent write
470 static void ctdb_persistent_write_callback(int status, void *private_data)
472 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
473 struct ctdb_persistent_write_state);
476 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
482 called if our lockwait child times out
484 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
485 struct timeval t, void *private_data)
487 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
488 struct ctdb_persistent_write_state);
489 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
493 struct childwrite_handle {
494 struct ctdb_context *ctdb;
495 struct ctdb_db_context *ctdb_db;
496 struct fd_event *fde;
500 void (*callback)(int, void *);
501 struct timeval start_time;
504 static int childwrite_destructor(struct childwrite_handle *h)
506 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
507 kill(h->child, SIGKILL);
511 /* called when the child process has finished writing the record to the
514 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
515 uint16_t flags, void *private_data)
517 struct childwrite_handle *h = talloc_get_type(private_data,
518 struct childwrite_handle);
519 void *p = h->private_data;
520 void (*callback)(int, void *) = h->callback;
521 pid_t child = h->child;
522 TALLOC_CTX *tmp_ctx = talloc_new(ev);
526 CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
527 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
529 /* the handle needs to go away when the context is gone - when
530 the handle goes away this implicitly closes the pipe, which
532 talloc_steal(tmp_ctx, h);
534 talloc_set_destructor(h, NULL);
536 ret = read(h->fd[0], &c, 1);
538 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
544 kill(child, SIGKILL);
545 talloc_free(tmp_ctx);
548 /* this creates a child process which will take out a tdb transaction
549 and write the record to the database.
551 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
552 void (*callback)(int, void *private_data),
553 struct ctdb_persistent_write_state *state)
555 struct childwrite_handle *result;
557 pid_t parent = getpid();
559 CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
560 CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
562 if (!(result = talloc_zero(state, struct childwrite_handle))) {
563 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
567 ret = pipe(result->fd);
571 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
575 result->child = ctdb_fork(ctdb_db->ctdb);
577 if (result->child == (pid_t)-1) {
578 close(result->fd[0]);
579 close(result->fd[1]);
581 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
585 result->callback = callback;
586 result->private_data = state;
587 result->ctdb = ctdb_db->ctdb;
588 result->ctdb_db = ctdb_db;
590 if (result->child == 0) {
593 close(result->fd[0]);
594 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
595 ret = ctdb_persistent_store(state);
597 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
601 write(result->fd[1], &c, 1);
603 /* make sure we die when our parent dies */
604 while (kill(parent, 0) == 0 || errno != ESRCH) {
610 close(result->fd[1]);
611 set_close_on_exec(result->fd[0]);
613 talloc_set_destructor(result, childwrite_destructor);
615 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
617 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
618 EVENT_FD_READ, childwrite_handler,
620 if (result->fde == NULL) {
622 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
625 tevent_fd_set_auto_close(result->fde);
627 result->start_time = timeval_current();
633 update a record on this node if the new record has a higher rsn than the
636 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
637 struct ctdb_req_control *c, TDB_DATA recdata,
640 struct ctdb_db_context *ctdb_db;
641 struct ctdb_persistent_write_state *state;
642 struct childwrite_handle *handle;
643 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
645 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
646 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
650 ctdb_db = find_ctdb_db(ctdb, m->db_id);
651 if (ctdb_db == NULL) {
652 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
656 if (ctdb_db->unhealthy_reason) {
657 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
658 ctdb_db->db_name, ctdb_db->unhealthy_reason));
662 state = talloc(ctdb, struct ctdb_persistent_write_state);
663 CTDB_NO_MEMORY(ctdb, state);
665 state->ctdb_db = ctdb_db;
669 /* create a child process to take out a transaction and
672 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
673 if (handle == NULL) {
674 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
679 /* we need to wait for the replies */
682 /* need to keep the control structure around */
683 talloc_steal(state, c);
685 /* but we won't wait forever */
686 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
687 ctdb_persistent_lock_timeout, state);
694 called when a client has finished a local commit in a transaction to
695 a persistent database
697 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
698 struct ctdb_req_control *c)
700 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
701 struct ctdb_db_context *ctdb_db;
703 ctdb_db = find_ctdb_db(ctdb, client->db_id);
704 if (ctdb_db == NULL) {
705 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
706 "Unknown database 0x%08x\n", client->db_id));
709 if (!ctdb_db->transaction_active) {
710 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
711 "Database 0x%08x has no transaction commit "
712 "started\n", client->db_id));
716 ctdb_db->transaction_active = false;
719 if (client->num_persistent_updates == 0) {
720 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
721 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
722 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
725 client->num_persistent_updates--;
727 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
728 "transaction commit db_id[0x%08x]\n",
729 client->client_id, ctdb_db->db_id));
735 called when a client gets an error committing its database
736 during a transaction commit
738 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
739 struct ctdb_req_control *c)
741 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
742 struct ctdb_db_context *ctdb_db;
744 ctdb_db = find_ctdb_db(ctdb, client->db_id);
745 if (ctdb_db == NULL) {
746 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
747 "Unknown database 0x%08x\n", client->db_id));
750 if (!ctdb_db->transaction_active) {
751 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
752 "Database 0x%08x has no transaction commit "
753 "started\n", client->db_id));
757 ctdb_db->transaction_active = false;
760 if (client->num_persistent_updates == 0) {
761 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
763 client->num_persistent_updates--;
766 DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
767 " db_id[0x%08x] - forcing recovery\n",
769 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
775 * Tell whether a transaction is active on this node on the give DB.
777 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
778 struct ctdb_req_control *c,
781 struct ctdb_db_context *ctdb_db;
782 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
784 ctdb_db = find_ctdb_db(ctdb, db_id);
786 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
790 if (client->db_id == db_id) {
794 if (ctdb_db->transaction_active) {
802 backwards compatibility:
804 start a persistent store operation. passing both the key, header and
805 data to the daemon. If the client disconnects before it has issued
806 a persistent_update call to the daemon we trigger a full recovery
807 to ensure the databases are brought back in sync.
808 for now we ignore the recdata that the client has passed to us.
810 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
811 struct ctdb_req_control *c,
814 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
816 if (client == NULL) {
817 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
821 client->num_persistent_updates++;
827 backwards compatibility:
829 called to tell ctdbd that it is no longer doing a persistent update
831 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
832 struct ctdb_req_control *c,
835 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
837 if (client == NULL) {
838 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
842 if (client->num_persistent_updates > 0) {
843 client->num_persistent_updates--;
851 backwards compatibility:
853 single record varient of ctdb_control_trans2_commit for older clients
855 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
856 struct ctdb_req_control *c,
857 TDB_DATA recdata, bool *async_reply)
859 struct ctdb_marshall_buffer *m;
860 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
863 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
864 rec->keylen + rec->datalen) {
865 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
869 key.dptr = &rec->data[0];
870 key.dsize = rec->keylen;
871 data.dptr = &rec->data[rec->keylen];
872 data.dsize = rec->datalen;
874 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
875 CTDB_NO_MEMORY(ctdb, m);
877 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
880 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
885 struct ctdb_db_context *ctdb_db;
886 const char *keyname = CTDB_DB_SEQNUM_KEY;
889 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
891 ctdb_db = find_ctdb_db(ctdb, db_id);
893 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
898 key.dptr = (uint8_t *)discard_const(keyname);
899 key.dsize = strlen(keyname) + 1;
901 ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
906 if (data.dsize != sizeof(uint64_t)) {
911 *seqnum = *(uint64_t *)data.dptr;
914 talloc_free(mem_ctx);
919 * Get the sequence number of a persistent database.
921 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
929 db_id = *(uint32_t *)indata.dptr;
930 ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
935 outdata->dsize = sizeof(uint64_t);
936 outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
937 if (outdata->dptr == NULL) {
942 *(outdata->dptr) = seqnum;