4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_req_control *c;
35 uint32_t num_failed, num_sent;
39 1) all nodes fail, and all nodes reply
40 2) some nodes fail, all nodes reply
46 called when a node has acknowledged a ctdb_control_update_record call
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49 int32_t status, TDB_DATA data,
53 struct ctdb_persistent_state *state = talloc_get_type(private_data,
54 struct ctdb_persistent_state);
56 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
57 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
58 "during recovery\n"));
63 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
65 state->status = status;
66 state->errormsg = errormsg;
70 if (state->num_pending == 0) {
71 enum ctdb_trans2_commit_error etype;
72 if (state->num_failed == state->num_sent) {
73 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
74 } else if (state->num_failed != 0) {
75 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
77 etype = CTDB_TRANS2_COMMIT_SUCCESS;
79 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
85 called if persistent store times out
87 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
88 struct timeval t, void *private_data)
90 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
92 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
93 "timeout in ctdb_persistent_state");
99 store a set of persistent records - called from a ctdb client when it has updated
100 some records in a persistent database. The client will have the record
101 locked for the duration of this call. The client is the dmaster when
104 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
105 struct ctdb_req_control *c,
106 TDB_DATA recdata, bool *async_reply)
108 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
109 struct ctdb_persistent_state *state;
111 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
112 struct ctdb_db_context *ctdb_db;
114 ctdb_db = find_ctdb_db(ctdb, m->db_id);
115 if (ctdb_db == NULL) {
116 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
117 "Unknown database db_id[0x%08x]\n", m->db_id));
121 if (client == NULL) {
122 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
126 if (ctdb_db->unhealthy_reason) {
127 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
128 ctdb_db->db_name, ctdb_db->unhealthy_reason));
132 /* handling num_persistent_updates is a bit strange -
134 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
135 They don't expect num_persistent_updates to be used at all
137 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
138 this commit to then decrement it
140 3) new clients which use TRANS2 commit functions, and
141 expect this function to increment the counter, and
142 then have it decremented in ctdb_control_trans2_error
143 or ctdb_control_trans2_finished
146 case CTDB_CONTROL_PERSISTENT_STORE:
147 if (ctdb_db->transaction_active) {
148 DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
149 "transaction is active on database "
150 "db_id[0x%08x] - refusing persistent "
151 " store for client id[0x%08x]\n",
152 ctdb_db->db_id, client->client_id));
155 if (client->num_persistent_updates > 0) {
156 client->num_persistent_updates--;
159 case CTDB_CONTROL_TRANS2_COMMIT:
160 if (ctdb_db->transaction_active) {
161 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
162 " already a transaction commit "
163 "active on db_id[0x%08x] - forbidding "
164 "client_id[0x%08x] to commit\n",
165 ctdb_db->db_id, client->client_id));
168 if (client->db_id != 0) {
169 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
170 "client-db_id[0x%08x] != 0 "
171 "(client_id[0x%08x])\n",
172 client->db_id, client->client_id));
175 client->num_persistent_updates++;
176 ctdb_db->transaction_active = true;
177 client->db_id = m->db_id;
178 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
179 " commit transaction on db id[0x%08x]\n",
180 client->client_id, client->db_id));
182 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
183 /* already updated from the first commit */
184 if (client->db_id != m->db_id) {
185 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
186 "retry: client-db_id[0x%08x] != "
187 "db_id[0x%08x] (client_id[0x%08x])\n",
189 m->db_id, client->client_id));
192 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
193 "transaction commit retry on "
195 client->client_id, client->db_id));
199 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
200 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
204 state = talloc_zero(ctdb, struct ctdb_persistent_state);
205 CTDB_NO_MEMORY(ctdb, state);
210 for (i=0;i<ctdb->vnn_map->size;i++) {
211 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
214 /* only send to active nodes */
215 if (node->flags & NODE_FLAGS_INACTIVE) {
219 /* don't send to ourselves */
220 if (node->pnn == ctdb->pnn) {
224 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
225 c->client_id, 0, recdata,
226 ctdb_persistent_callback, state);
228 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
233 state->num_pending++;
237 if (state->num_pending == 0) {
242 /* we need to wait for the replies */
245 /* need to keep the control structure around */
246 talloc_steal(state, c);
248 /* but we won't wait forever */
249 event_add_timed(ctdb->ev, state,
250 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
251 ctdb_persistent_store_timeout, state);
258 * Store a set of persistent records.
259 * This is used to roll out a transaction to all nodes.
261 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
262 struct ctdb_req_control *c,
263 TDB_DATA recdata, bool *async_reply)
265 struct ctdb_client *client;
266 struct ctdb_persistent_state *state;
268 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
269 struct ctdb_db_context *ctdb_db;
271 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
272 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
276 ctdb_db = find_ctdb_db(ctdb, m->db_id);
277 if (ctdb_db == NULL) {
278 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
279 "Unknown database db_id[0x%08x]\n", m->db_id));
283 client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
284 if (client == NULL) {
285 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
286 "to a client. Returning error\n"));
290 state = talloc_zero(ctdb, struct ctdb_persistent_state);
291 CTDB_NO_MEMORY(ctdb, state);
296 for (i = 0; i < ctdb->vnn_map->size; i++) {
297 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
300 /* only send to active nodes */
301 if (node->flags & NODE_FLAGS_INACTIVE) {
305 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
306 CTDB_CONTROL_UPDATE_RECORD,
307 c->client_id, 0, recdata,
308 ctdb_persistent_callback,
311 DEBUG(DEBUG_ERR,("Unable to send "
312 "CTDB_CONTROL_UPDATE_RECORD "
313 "to pnn %u\n", node->pnn));
318 state->num_pending++;
322 if (state->num_pending == 0) {
327 /* we need to wait for the replies */
330 /* need to keep the control structure around */
331 talloc_steal(state, c);
333 /* but we won't wait forever */
334 event_add_timed(ctdb->ev, state,
335 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
336 ctdb_persistent_store_timeout, state);
342 struct ctdb_persistent_write_state {
343 struct ctdb_db_context *ctdb_db;
344 struct ctdb_marshall_buffer *m;
345 struct ctdb_req_control *c;
350 called from a child process to write the data
352 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
355 struct ctdb_rec_data *rec = NULL;
356 struct ctdb_marshall_buffer *m = state->m;
358 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
360 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
361 state->ctdb_db->db_id));
365 for (i=0;i<m->count;i++) {
366 struct ctdb_ltdb_header oldheader;
367 struct ctdb_ltdb_header header;
368 TDB_DATA key, data, olddata;
369 TALLOC_CTX *tmp_ctx = talloc_new(state);
371 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
374 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
375 i, state->ctdb_db->db_id));
376 talloc_free(tmp_ctx);
380 /* fetch the old header and ensure the rsn is less than the new rsn */
381 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
383 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
384 state->ctdb_db->db_id));
385 talloc_free(tmp_ctx);
389 if (oldheader.rsn >= header.rsn &&
390 (olddata.dsize != data.dsize ||
391 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
392 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
393 state->ctdb_db->db_id,
394 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
395 talloc_free(tmp_ctx);
399 talloc_free(tmp_ctx);
401 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
403 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
404 state->ctdb_db->db_id));
409 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
411 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
412 state->ctdb_db->db_id));
419 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
425 called when we the child has completed the persistent write
428 static void ctdb_persistent_write_callback(int status, void *private_data)
430 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
431 struct ctdb_persistent_write_state);
434 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
440 called if our lockwait child times out
442 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
443 struct timeval t, void *private_data)
445 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
446 struct ctdb_persistent_write_state);
447 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
451 struct childwrite_handle {
452 struct ctdb_context *ctdb;
453 struct ctdb_db_context *ctdb_db;
454 struct fd_event *fde;
458 void (*callback)(int, void *);
459 struct timeval start_time;
462 static int childwrite_destructor(struct childwrite_handle *h)
464 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
465 kill(h->child, SIGKILL);
469 /* called when the child process has finished writing the record to the
472 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
473 uint16_t flags, void *private_data)
475 struct childwrite_handle *h = talloc_get_type(private_data,
476 struct childwrite_handle);
477 void *p = h->private_data;
478 void (*callback)(int, void *) = h->callback;
479 pid_t child = h->child;
480 TALLOC_CTX *tmp_ctx = talloc_new(ev);
484 CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
485 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
487 /* the handle needs to go away when the context is gone - when
488 the handle goes away this implicitly closes the pipe, which
490 talloc_steal(tmp_ctx, h);
492 talloc_set_destructor(h, NULL);
494 ret = read(h->fd[0], &c, 1);
496 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
502 kill(child, SIGKILL);
503 talloc_free(tmp_ctx);
506 /* this creates a child process which will take out a tdb transaction
507 and write the record to the database.
509 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
510 void (*callback)(int, void *private_data),
511 struct ctdb_persistent_write_state *state)
513 struct childwrite_handle *result;
515 pid_t parent = getpid();
517 CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
518 CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
520 if (!(result = talloc_zero(state, struct childwrite_handle))) {
521 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
525 ret = pipe(result->fd);
529 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
533 result->child = ctdb_fork(ctdb_db->ctdb);
535 if (result->child == (pid_t)-1) {
536 close(result->fd[0]);
537 close(result->fd[1]);
539 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
543 result->callback = callback;
544 result->private_data = state;
545 result->ctdb = ctdb_db->ctdb;
546 result->ctdb_db = ctdb_db;
548 if (result->child == 0) {
551 close(result->fd[0]);
552 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
553 ret = ctdb_persistent_store(state);
555 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
559 write(result->fd[1], &c, 1);
561 /* make sure we die when our parent dies */
562 while (kill(parent, 0) == 0 || errno != ESRCH) {
568 close(result->fd[1]);
569 set_close_on_exec(result->fd[0]);
571 talloc_set_destructor(result, childwrite_destructor);
573 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
575 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
576 EVENT_FD_READ, childwrite_handler,
578 if (result->fde == NULL) {
580 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
583 tevent_fd_set_auto_close(result->fde);
585 result->start_time = timeval_current();
591 update a record on this node if the new record has a higher rsn than the
594 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
595 struct ctdb_req_control *c, TDB_DATA recdata,
598 struct ctdb_db_context *ctdb_db;
599 struct ctdb_persistent_write_state *state;
600 struct childwrite_handle *handle;
601 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
603 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
604 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
608 ctdb_db = find_ctdb_db(ctdb, m->db_id);
609 if (ctdb_db == NULL) {
610 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
614 if (ctdb_db->unhealthy_reason) {
615 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
616 ctdb_db->db_name, ctdb_db->unhealthy_reason));
620 state = talloc(ctdb, struct ctdb_persistent_write_state);
621 CTDB_NO_MEMORY(ctdb, state);
623 state->ctdb_db = ctdb_db;
627 /* create a child process to take out a transaction and
630 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
631 if (handle == NULL) {
632 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
637 /* we need to wait for the replies */
640 /* need to keep the control structure around */
641 talloc_steal(state, c);
643 /* but we won't wait forever */
644 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
645 ctdb_persistent_lock_timeout, state);
652 called when a client has finished a local commit in a transaction to
653 a persistent database
655 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
656 struct ctdb_req_control *c)
658 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
659 struct ctdb_db_context *ctdb_db;
661 ctdb_db = find_ctdb_db(ctdb, client->db_id);
662 if (ctdb_db == NULL) {
663 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
664 "Unknown database 0x%08x\n", client->db_id));
667 if (!ctdb_db->transaction_active) {
668 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
669 "Database 0x%08x has no transaction commit "
670 "started\n", client->db_id));
674 ctdb_db->transaction_active = false;
677 if (client->num_persistent_updates == 0) {
678 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
679 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
680 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
683 client->num_persistent_updates--;
685 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
686 "transaction commit db_id[0x%08x]\n",
687 client->client_id, ctdb_db->db_id));
693 called when a client gets an error committing its database
694 during a transaction commit
696 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
697 struct ctdb_req_control *c)
699 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
700 struct ctdb_db_context *ctdb_db;
702 ctdb_db = find_ctdb_db(ctdb, client->db_id);
703 if (ctdb_db == NULL) {
704 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
705 "Unknown database 0x%08x\n", client->db_id));
708 if (!ctdb_db->transaction_active) {
709 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
710 "Database 0x%08x has no transaction commit "
711 "started\n", client->db_id));
715 ctdb_db->transaction_active = false;
718 if (client->num_persistent_updates == 0) {
719 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
721 client->num_persistent_updates--;
724 DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
725 " db_id[0x%08x] - forcing recovery\n",
727 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
733 * Tell whether a transaction is active on this node on the give DB.
735 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
736 struct ctdb_req_control *c,
739 struct ctdb_db_context *ctdb_db;
740 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
742 ctdb_db = find_ctdb_db(ctdb, db_id);
744 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
748 if (client->db_id == db_id) {
752 if (ctdb_db->transaction_active) {
760 backwards compatibility:
762 start a persistent store operation. passing both the key, header and
763 data to the daemon. If the client disconnects before it has issued
764 a persistent_update call to the daemon we trigger a full recovery
765 to ensure the databases are brought back in sync.
766 for now we ignore the recdata that the client has passed to us.
768 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
769 struct ctdb_req_control *c,
772 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
774 if (client == NULL) {
775 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
779 client->num_persistent_updates++;
785 backwards compatibility:
787 called to tell ctdbd that it is no longer doing a persistent update
789 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
790 struct ctdb_req_control *c,
793 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
795 if (client == NULL) {
796 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
800 if (client->num_persistent_updates > 0) {
801 client->num_persistent_updates--;
809 backwards compatibility:
811 single record varient of ctdb_control_trans2_commit for older clients
813 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
814 struct ctdb_req_control *c,
815 TDB_DATA recdata, bool *async_reply)
817 struct ctdb_marshall_buffer *m;
818 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
821 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
822 rec->keylen + rec->datalen) {
823 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
827 key.dptr = &rec->data[0];
828 key.dsize = rec->keylen;
829 data.dptr = &rec->data[rec->keylen];
830 data.dsize = rec->datalen;
832 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
833 CTDB_NO_MEMORY(ctdb, m);
835 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
838 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
843 struct ctdb_db_context *ctdb_db;
844 const char *keyname = CTDB_DB_SEQNUM_KEY;
847 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
849 ctdb_db = find_ctdb_db(ctdb, db_id);
851 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
856 key.dptr = (uint8_t *)discard_const(keyname);
857 key.dsize = strlen(keyname) + 1;
859 ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
864 if (data.dsize != sizeof(uint64_t)) {
869 *seqnum = *(uint64_t *)data.dptr;
872 talloc_free(mem_ctx);
877 * Get the sequence number of a persistent database.
879 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
887 db_id = *(uint32_t *)indata.dptr;
888 ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
893 outdata->dsize = sizeof(uint64_t);
894 outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
895 if (outdata->dptr == NULL) {
900 *(outdata->dptr) = seqnum;