4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_req_control *c;
35 uint32_t num_failed, num_sent;
39 1) all nodes fail, and all nodes reply
40 2) some nodes fail, all nodes reply
46 called when a node has acknowledged a ctdb_control_update_record call
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49 int32_t status, TDB_DATA data,
53 struct ctdb_persistent_state *state = talloc_get_type(private_data,
54 struct ctdb_persistent_state);
57 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
59 state->status = status;
60 state->errormsg = errormsg;
64 if (state->num_pending == 0) {
65 enum ctdb_trans2_commit_error etype;
66 if (state->num_failed == state->num_sent) {
67 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
68 } else if (state->num_failed != 0) {
69 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
71 etype = CTDB_TRANS2_COMMIT_SUCCESS;
73 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
79 called if persistent store times out
81 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
82 struct timeval t, void *private_data)
84 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
86 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
87 "timeout in ctdb_persistent_state");
93 store a set of persistent records - called from a ctdb client when it has updated
94 some records in a persistent database. The client will have the record
95 locked for the duration of this call. The client is the dmaster when
98 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
99 struct ctdb_req_control *c,
100 TDB_DATA recdata, bool *async_reply)
102 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
103 struct ctdb_persistent_state *state;
105 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
106 struct ctdb_db_context *ctdb_db;
108 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
113 ctdb_db = find_ctdb_db(ctdb, m->db_id);
114 if (ctdb_db == NULL) {
115 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
116 "Unknown database 0x%08x\n", m->db_id));
120 if (client == NULL) {
121 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
125 /* handling num_persistent_updates is a bit strange -
127 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
128 They don't expect num_persistent_updates to be used at all
130 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
131 this commit to then decrement it
133 3) new clients which use TRANS2 commit functions, and
134 expect this function to increment the counter, and
135 then have it decremented in ctdb_control_trans2_error
136 or ctdb_control_trans2_finished
139 case CTDB_CONTROL_PERSISTENT_STORE:
140 if (ctdb_db->transaction_active) {
141 DEBUG(DEBUG_ERR, (__location__ " trans2_commit client db_id[%d] transaction active - refusing persistent store\n",
145 if (client->num_persistent_updates > 0) {
146 client->num_persistent_updates--;
149 case CTDB_CONTROL_TRANS2_COMMIT:
150 if (ctdb_db->transaction_active) {
151 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: client "
152 "already has a transaction commit "
153 "active on db_id[%d]\n",
157 if (client->db_id != 0) {
158 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
159 "client-db_id[%d] != 0\n",
163 client->num_persistent_updates++;
164 ctdb_db->transaction_active = true;
165 client->db_id = m->db_id;
166 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
167 " commit transaction on db id[0x%08x]\n",
168 client->client_id, client->db_id));
170 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
171 /* already updated from the first commit */
172 if (client->db_id != m->db_id) {
173 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
174 "retry: client-db_id[%d] != db_id[%d]"
175 "\n", client->db_id, m->db_id));
178 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
179 "transaction commit retry on "
181 client->client_id, client->db_id));
185 state = talloc_zero(ctdb, struct ctdb_persistent_state);
186 CTDB_NO_MEMORY(ctdb, state);
191 for (i=0;i<ctdb->vnn_map->size;i++) {
192 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
195 /* only send to active nodes */
196 if (node->flags & NODE_FLAGS_INACTIVE) {
200 /* don't send to ourselves */
201 if (node->pnn == ctdb->pnn) {
205 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
206 c->client_id, 0, recdata,
207 ctdb_persistent_callback, state);
209 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
214 state->num_pending++;
218 if (state->num_pending == 0) {
223 /* we need to wait for the replies */
226 /* need to keep the control structure around */
227 talloc_steal(state, c);
229 /* but we won't wait forever */
230 event_add_timed(ctdb->ev, state,
231 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
232 ctdb_persistent_store_timeout, state);
238 struct ctdb_persistent_write_state {
239 struct ctdb_db_context *ctdb_db;
240 struct ctdb_marshall_buffer *m;
241 struct ctdb_req_control *c;
246 called from a child process to write the data
248 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
251 struct ctdb_rec_data *rec = NULL;
252 struct ctdb_marshall_buffer *m = state->m;
254 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
256 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
257 state->ctdb_db->db_id));
261 for (i=0;i<m->count;i++) {
262 struct ctdb_ltdb_header oldheader;
263 struct ctdb_ltdb_header header;
264 TDB_DATA key, data, olddata;
265 TALLOC_CTX *tmp_ctx = talloc_new(state);
267 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
270 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
271 i, state->ctdb_db->db_id));
272 talloc_free(tmp_ctx);
276 /* fetch the old header and ensure the rsn is less than the new rsn */
277 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
279 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
280 state->ctdb_db->db_id));
281 talloc_free(tmp_ctx);
285 if (oldheader.rsn >= header.rsn &&
286 (olddata.dsize != data.dsize ||
287 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
288 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
289 state->ctdb_db->db_id,
290 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
291 talloc_free(tmp_ctx);
295 talloc_free(tmp_ctx);
297 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
299 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
300 state->ctdb_db->db_id));
305 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
307 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
308 state->ctdb_db->db_id));
315 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
321 called when we the child has completed the persistent write
324 static void ctdb_persistent_write_callback(int status, void *private_data)
326 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
327 struct ctdb_persistent_write_state);
330 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
336 called if our lockwait child times out
338 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
339 struct timeval t, void *private_data)
341 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
342 struct ctdb_persistent_write_state);
343 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
347 struct childwrite_handle {
348 struct ctdb_context *ctdb;
349 struct ctdb_db_context *ctdb_db;
350 struct fd_event *fde;
354 void (*callback)(int, void *);
355 struct timeval start_time;
358 static int childwrite_destructor(struct childwrite_handle *h)
360 h->ctdb->statistics.pending_childwrite_calls--;
361 kill(h->child, SIGKILL);
365 /* called when the child process has finished writing the record to the
368 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
369 uint16_t flags, void *private_data)
371 struct childwrite_handle *h = talloc_get_type(private_data,
372 struct childwrite_handle);
373 void *p = h->private_data;
374 void (*callback)(int, void *) = h->callback;
375 pid_t child = h->child;
376 TALLOC_CTX *tmp_ctx = talloc_new(ev);
380 ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
381 h->ctdb->statistics.pending_childwrite_calls--;
383 /* the handle needs to go away when the context is gone - when
384 the handle goes away this implicitly closes the pipe, which
386 talloc_steal(tmp_ctx, h);
388 talloc_set_destructor(h, NULL);
390 ret = read(h->fd[0], &c, 1);
392 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
398 kill(child, SIGKILL);
399 talloc_free(tmp_ctx);
402 /* this creates a child process which will take out a tdb transaction
403 and write the record to the database.
405 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
406 void (*callback)(int, void *private_data),
407 struct ctdb_persistent_write_state *state)
409 struct childwrite_handle *result;
411 pid_t parent = getpid();
413 ctdb_db->ctdb->statistics.childwrite_calls++;
414 ctdb_db->ctdb->statistics.pending_childwrite_calls++;
416 if (!(result = talloc_zero(state, struct childwrite_handle))) {
417 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
421 ret = pipe(result->fd);
425 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
429 result->child = fork();
431 if (result->child == (pid_t)-1) {
432 close(result->fd[0]);
433 close(result->fd[1]);
435 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
439 result->callback = callback;
440 result->private_data = state;
441 result->ctdb = ctdb_db->ctdb;
442 result->ctdb_db = ctdb_db;
444 if (result->child == 0) {
447 close(result->fd[0]);
448 ret = ctdb_persistent_store(state);
450 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
454 write(result->fd[1], &c, 1);
456 /* make sure we die when our parent dies */
457 while (kill(parent, 0) == 0 || errno != ESRCH) {
463 close(result->fd[1]);
464 set_close_on_exec(result->fd[0]);
466 talloc_set_destructor(result, childwrite_destructor);
468 DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
470 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
471 EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
473 if (result->fde == NULL) {
475 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
479 result->start_time = timeval_current();
485 update a record on this node if the new record has a higher rsn than the
488 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
489 struct ctdb_req_control *c, TDB_DATA recdata,
492 struct ctdb_db_context *ctdb_db;
493 struct ctdb_persistent_write_state *state;
494 struct childwrite_handle *handle;
495 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
497 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
498 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
502 ctdb_db = find_ctdb_db(ctdb, m->db_id);
503 if (ctdb_db == NULL) {
504 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
508 state = talloc(ctdb, struct ctdb_persistent_write_state);
509 CTDB_NO_MEMORY(ctdb, state);
511 state->ctdb_db = ctdb_db;
515 /* create a child process to take out a transaction and
518 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
519 if (handle == NULL) {
520 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
525 /* we need to wait for the replies */
528 /* need to keep the control structure around */
529 talloc_steal(state, c);
531 /* but we won't wait forever */
532 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
533 ctdb_persistent_lock_timeout, state);
540 called when a client has finished a local commit in a transaction to
541 a persistent database
543 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
544 struct ctdb_req_control *c)
546 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
547 struct ctdb_db_context *ctdb_db;
549 ctdb_db = find_ctdb_db(ctdb, client->db_id);
550 if (ctdb_db == NULL) {
551 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
552 "Unknown database 0x%08x\n", client->db_id));
555 if (!ctdb_db->transaction_active) {
556 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
557 "Database 0x%08x has no transaction commit "
558 "started\n", client->db_id));
562 ctdb_db->transaction_active = false;
565 if (client->num_persistent_updates == 0) {
566 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
567 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
568 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
571 client->num_persistent_updates--;
573 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
574 "transaction commit db_id[0x%08x]\n",
575 client->client_id, ctdb_db->db_id));
581 called when a client gets an error committing its database
582 during a transaction commit
584 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
585 struct ctdb_req_control *c)
587 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
588 struct ctdb_db_context *ctdb_db;
590 ctdb_db = find_ctdb_db(ctdb, client->db_id);
591 if (ctdb_db == NULL) {
592 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
593 "Unknown database 0x%08x\n", client->db_id));
596 if (!ctdb_db->transaction_active) {
597 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
598 "Database 0x%08x has no transaction commit "
599 "started\n", client->db_id));
603 ctdb_db->transaction_active = false;
606 if (client->num_persistent_updates == 0) {
607 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
609 client->num_persistent_updates--;
612 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
613 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
619 * Tell whether a transaction is active on this node on the give DB.
621 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
622 struct ctdb_req_control *c,
625 struct ctdb_db_context *ctdb_db;
626 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
628 ctdb_db = find_ctdb_db(ctdb, db_id);
630 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
634 if (client->db_id == db_id) {
638 if (ctdb_db->transaction_active) {
646 backwards compatibility:
648 start a persistent store operation. passing both the key, header and
649 data to the daemon. If the client disconnects before it has issued
650 a persistent_update call to the daemon we trigger a full recovery
651 to ensure the databases are brought back in sync.
652 for now we ignore the recdata that the client has passed to us.
654 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
655 struct ctdb_req_control *c,
658 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
660 if (client == NULL) {
661 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
665 client->num_persistent_updates++;
671 backwards compatibility:
673 called to tell ctdbd that it is no longer doing a persistent update
675 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
676 struct ctdb_req_control *c,
679 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
681 if (client == NULL) {
682 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
686 if (client->num_persistent_updates > 0) {
687 client->num_persistent_updates--;
695 backwards compatibility:
697 single record varient of ctdb_control_trans2_commit for older clients
699 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
700 struct ctdb_req_control *c,
701 TDB_DATA recdata, bool *async_reply)
703 struct ctdb_marshall_buffer *m;
704 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
707 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
708 rec->keylen + rec->datalen) {
709 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
713 key.dptr = &rec->data[0];
714 key.dsize = rec->keylen;
715 data.dptr = &rec->data[rec->keylen];
716 data.dsize = rec->datalen;
718 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
719 CTDB_NO_MEMORY(ctdb, m);
721 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);