4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_req_control *c;
35 uint32_t num_failed, num_sent;
39 1) all nodes fail, and all nodes reply
40 2) some nodes fail, all nodes reply
46 called when a node has acknowledged a ctdb_control_update_record call
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49 int32_t status, TDB_DATA data,
53 struct ctdb_persistent_state *state = talloc_get_type(private_data,
54 struct ctdb_persistent_state);
57 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
59 state->status = status;
60 state->errormsg = errormsg;
64 if (state->num_pending == 0) {
65 enum ctdb_trans2_commit_error etype;
66 if (state->num_failed == state->num_sent) {
67 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
68 } else if (state->num_failed != 0) {
69 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
71 etype = CTDB_TRANS2_COMMIT_SUCCESS;
73 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
79 called if persistent store times out
81 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
82 struct timeval t, void *private_data)
84 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
86 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
87 "timeout in ctdb_persistent_state");
93 store a set of persistent records - called from a ctdb client when it has updated
94 some records in a persistent database. The client will have the record
95 locked for the duration of this call. The client is the dmaster when
98 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
99 struct ctdb_req_control *c,
100 TDB_DATA recdata, bool *async_reply)
102 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
103 struct ctdb_persistent_state *state;
105 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
106 struct ctdb_db_context *ctdb_db;
108 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
113 ctdb_db = find_ctdb_db(ctdb, m->db_id);
114 if (ctdb_db == NULL) {
115 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
116 "Unknown database 0x%08x\n", m->db_id));
120 if (client == NULL) {
121 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
125 /* handling num_persistent_updates is a bit strange -
127 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
128 They don't expect num_persistent_updates to be used at all
130 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
131 this commit to then decrement it
133 3) new clients which use TRANS2 commit functions, and
134 expect this function to increment the counter, and
135 then have it decremented in ctdb_control_trans2_error
136 or ctdb_control_trans2_finished
139 case CTDB_CONTROL_PERSISTENT_STORE:
140 if (ctdb_db->transaction_active) {
141 DEBUG(DEBUG_ERR, (__location__ " trans2_commit client db_id[%d] transaction active - refusing persistent store\n",
145 if (client->num_persistent_updates > 0) {
146 client->num_persistent_updates--;
149 case CTDB_CONTROL_TRANS2_COMMIT:
150 if (ctdb_db->transaction_active) {
151 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: client "
152 "already has a transaction commit "
153 "active on db_id[%d]\n",
157 if (client->db_id != 0) {
158 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
159 "client-db_id[%d] != 0\n",
163 client->num_persistent_updates++;
164 ctdb_db->transaction_active = true;
165 client->db_id = m->db_id;
167 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
168 /* already updated from the first commit */
169 if (client->db_id != m->db_id) {
170 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
171 "retry: client-db_id[%d] != db_id[%d]"
172 "\n", client->db_id, m->db_id));
178 state = talloc_zero(ctdb, struct ctdb_persistent_state);
179 CTDB_NO_MEMORY(ctdb, state);
184 for (i=0;i<ctdb->vnn_map->size;i++) {
185 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
188 /* only send to active nodes */
189 if (node->flags & NODE_FLAGS_INACTIVE) {
193 /* don't send to ourselves */
194 if (node->pnn == ctdb->pnn) {
198 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
199 c->client_id, 0, recdata,
200 ctdb_persistent_callback, state);
202 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
207 state->num_pending++;
211 if (state->num_pending == 0) {
216 /* we need to wait for the replies */
219 /* need to keep the control structure around */
220 talloc_steal(state, c);
222 /* but we won't wait forever */
223 event_add_timed(ctdb->ev, state,
224 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
225 ctdb_persistent_store_timeout, state);
231 struct ctdb_persistent_write_state {
232 struct ctdb_db_context *ctdb_db;
233 struct ctdb_marshall_buffer *m;
234 struct ctdb_req_control *c;
239 called from a child process to write the data
241 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
244 struct ctdb_rec_data *rec = NULL;
245 struct ctdb_marshall_buffer *m = state->m;
247 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
249 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
250 state->ctdb_db->db_id));
254 for (i=0;i<m->count;i++) {
255 struct ctdb_ltdb_header oldheader;
256 struct ctdb_ltdb_header header;
257 TDB_DATA key, data, olddata;
258 TALLOC_CTX *tmp_ctx = talloc_new(state);
260 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
263 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
264 i, state->ctdb_db->db_id));
265 talloc_free(tmp_ctx);
269 /* fetch the old header and ensure the rsn is less than the new rsn */
270 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
272 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
273 state->ctdb_db->db_id));
274 talloc_free(tmp_ctx);
278 if (oldheader.rsn >= header.rsn &&
279 (olddata.dsize != data.dsize ||
280 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
281 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
282 state->ctdb_db->db_id,
283 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
284 talloc_free(tmp_ctx);
288 talloc_free(tmp_ctx);
290 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
292 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
293 state->ctdb_db->db_id));
298 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
300 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
301 state->ctdb_db->db_id));
308 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
314 called when we the child has completed the persistent write
317 static void ctdb_persistent_write_callback(int status, void *private_data)
319 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
320 struct ctdb_persistent_write_state);
323 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
329 called if our lockwait child times out
331 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
332 struct timeval t, void *private_data)
334 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
335 struct ctdb_persistent_write_state);
336 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
340 struct childwrite_handle {
341 struct ctdb_context *ctdb;
342 struct ctdb_db_context *ctdb_db;
343 struct fd_event *fde;
347 void (*callback)(int, void *);
348 struct timeval start_time;
351 static int childwrite_destructor(struct childwrite_handle *h)
353 h->ctdb->statistics.pending_childwrite_calls--;
354 kill(h->child, SIGKILL);
358 /* called when the child process has finished writing the record to the
361 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
362 uint16_t flags, void *private_data)
364 struct childwrite_handle *h = talloc_get_type(private_data,
365 struct childwrite_handle);
366 void *p = h->private_data;
367 void (*callback)(int, void *) = h->callback;
368 pid_t child = h->child;
369 TALLOC_CTX *tmp_ctx = talloc_new(ev);
373 ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
374 h->ctdb->statistics.pending_childwrite_calls--;
376 /* the handle needs to go away when the context is gone - when
377 the handle goes away this implicitly closes the pipe, which
379 talloc_steal(tmp_ctx, h);
381 talloc_set_destructor(h, NULL);
383 ret = read(h->fd[0], &c, 1);
385 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
391 kill(child, SIGKILL);
392 talloc_free(tmp_ctx);
395 /* this creates a child process which will take out a tdb transaction
396 and write the record to the database.
398 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
399 void (*callback)(int, void *private_data),
400 struct ctdb_persistent_write_state *state)
402 struct childwrite_handle *result;
404 pid_t parent = getpid();
406 ctdb_db->ctdb->statistics.childwrite_calls++;
407 ctdb_db->ctdb->statistics.pending_childwrite_calls++;
409 if (!(result = talloc_zero(state, struct childwrite_handle))) {
410 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
414 ret = pipe(result->fd);
418 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
422 result->child = fork();
424 if (result->child == (pid_t)-1) {
425 close(result->fd[0]);
426 close(result->fd[1]);
428 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
432 result->callback = callback;
433 result->private_data = state;
434 result->ctdb = ctdb_db->ctdb;
435 result->ctdb_db = ctdb_db;
437 if (result->child == 0) {
440 close(result->fd[0]);
441 ret = ctdb_persistent_store(state);
443 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
447 write(result->fd[1], &c, 1);
449 /* make sure we die when our parent dies */
450 while (kill(parent, 0) == 0 || errno != ESRCH) {
456 close(result->fd[1]);
457 talloc_set_destructor(result, childwrite_destructor);
459 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
460 EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
462 if (result->fde == NULL) {
464 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
468 result->start_time = timeval_current();
474 update a record on this node if the new record has a higher rsn than the
477 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
478 struct ctdb_req_control *c, TDB_DATA recdata,
481 struct ctdb_db_context *ctdb_db;
482 struct ctdb_persistent_write_state *state;
483 struct childwrite_handle *handle;
484 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
486 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
487 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
491 ctdb_db = find_ctdb_db(ctdb, m->db_id);
492 if (ctdb_db == NULL) {
493 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
497 state = talloc(ctdb, struct ctdb_persistent_write_state);
498 CTDB_NO_MEMORY(ctdb, state);
500 state->ctdb_db = ctdb_db;
504 /* create a child process to take out a transaction and
507 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
508 if (handle == NULL) {
509 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
514 /* we need to wait for the replies */
517 /* need to keep the control structure around */
518 talloc_steal(state, c);
520 /* but we won't wait forever */
521 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
522 ctdb_persistent_lock_timeout, state);
529 called when a client has finished a local commit in a transaction to
530 a persistent database
532 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
533 struct ctdb_req_control *c)
535 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
536 struct ctdb_db_context *ctdb_db;
538 ctdb_db = find_ctdb_db(ctdb, client->db_id);
539 if (ctdb_db == NULL) {
540 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
541 "Unknown database 0x%08x\n", client->db_id));
544 if (!ctdb_db->transaction_active) {
545 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
546 "Database 0x%08x has no transaction commit "
547 "started\n", client->db_id));
551 ctdb_db->transaction_active = false;
554 if (client->num_persistent_updates == 0) {
555 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
556 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
557 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
560 client->num_persistent_updates--;
566 called when a client gets an error committing its database
567 during a transaction commit
569 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
570 struct ctdb_req_control *c)
572 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
573 struct ctdb_db_context *ctdb_db;
575 ctdb_db = find_ctdb_db(ctdb, client->db_id);
576 if (ctdb_db == NULL) {
577 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
578 "Unknown database 0x%08x\n", client->db_id));
581 if (!ctdb_db->transaction_active) {
582 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
583 "Database 0x%08x has no transaction commit "
584 "started\n", client->db_id));
588 ctdb_db->transaction_active = false;
591 if (client->num_persistent_updates == 0) {
592 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
594 client->num_persistent_updates--;
597 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
598 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
605 backwards compatibility:
607 start a persistent store operation. passing both the key, header and
608 data to the daemon. If the client disconnects before it has issued
609 a persistent_update call to the daemon we trigger a full recovery
610 to ensure the databases are brought back in sync.
611 for now we ignore the recdata that the client has passed to us.
613 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
614 struct ctdb_req_control *c,
617 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
619 if (client == NULL) {
620 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
624 client->num_persistent_updates++;
630 backwards compatibility:
632 called to tell ctdbd that it is no longer doing a persistent update
634 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
635 struct ctdb_req_control *c,
638 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
640 if (client == NULL) {
641 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
645 if (client->num_persistent_updates > 0) {
646 client->num_persistent_updates--;
654 backwards compatibility:
656 single record varient of ctdb_control_trans2_commit for older clients
658 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
659 struct ctdb_req_control *c,
660 TDB_DATA recdata, bool *async_reply)
662 struct ctdb_marshall_buffer *m;
663 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
666 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
667 rec->keylen + rec->datalen) {
668 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
672 key.dptr = &rec->data[0];
673 key.dsize = rec->keylen;
674 data.dptr = &rec->data[rec->keylen];
675 data.dsize = rec->datalen;
677 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
678 CTDB_NO_MEMORY(ctdb, m);
680 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);