4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_req_control *c;
35 uint32_t num_failed, num_sent;
39 1) all nodes fail, and all nodes reply
40 2) some nodes fail, all nodes reply
46 called when a node has acknowledged a ctdb_control_update_record call
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49 int32_t status, TDB_DATA data,
53 struct ctdb_persistent_state *state = talloc_get_type(private_data,
54 struct ctdb_persistent_state);
57 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
59 state->status = status;
60 state->errormsg = errormsg;
64 if (state->num_pending == 0) {
65 enum ctdb_trans2_commit_error etype;
66 if (state->num_failed == state->num_sent) {
67 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
68 } else if (state->num_failed != 0) {
69 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
71 etype = CTDB_TRANS2_COMMIT_SUCCESS;
73 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
79 called if persistent store times out
81 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
82 struct timeval t, void *private_data)
84 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
86 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
87 "timeout in ctdb_persistent_state");
93 store a set of persistent records - called from a ctdb client when it has updated
94 some records in a persistent database. The client will have the record
95 locked for the duration of this call. The client is the dmaster when
98 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
99 struct ctdb_req_control *c,
100 TDB_DATA recdata, bool *async_reply)
102 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
103 struct ctdb_persistent_state *state;
105 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
106 struct ctdb_db_context *ctdb_db;
108 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
113 ctdb_db = find_ctdb_db(ctdb, m->db_id);
114 if (ctdb_db == NULL) {
115 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
116 "Unknown database 0x%08x\n", m->db_id));
120 if (client == NULL) {
121 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
125 /* handling num_persistent_updates is a bit strange -
127 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
128 They don't expect num_persistent_updates to be used at all
130 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
131 this commit to then decrement it
133 3) new clients which use TRANS2 commit functions, and
134 expect this function to increment the counter, and
135 then have it decremented in ctdb_control_trans2_error
136 or ctdb_control_trans2_finished
139 case CTDB_CONTROL_PERSISTENT_STORE:
140 if (ctdb_db->transaction_active) {
141 DEBUG(DEBUG_ERR, (__location__ " trans2_commit client db_id[%d] transaction active - refusing persistent store\n",
145 if (client->num_persistent_updates > 0) {
146 client->num_persistent_updates--;
149 case CTDB_CONTROL_TRANS2_COMMIT:
150 if (ctdb_db->transaction_active) {
151 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: client "
152 "already has a transaction commit "
153 "active on db_id[%d]\n",
157 if (client->db_id != 0) {
158 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
159 "client-db_id[%d] != 0\n",
163 client->num_persistent_updates++;
164 ctdb_db->transaction_active = true;
165 client->db_id = m->db_id;
167 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
168 /* already updated from the first commit */
169 if (client->db_id != m->db_id) {
170 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
171 "retry: client-db_id[%d] != db_id[%d]"
172 "\n", client->db_id, m->db_id));
178 state = talloc_zero(ctdb, struct ctdb_persistent_state);
179 CTDB_NO_MEMORY(ctdb, state);
184 for (i=0;i<ctdb->vnn_map->size;i++) {
185 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
188 /* only send to active nodes */
189 if (node->flags & NODE_FLAGS_INACTIVE) {
193 /* don't send to ourselves */
194 if (node->pnn == ctdb->pnn) {
198 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
199 c->client_id, 0, recdata,
200 ctdb_persistent_callback, state);
202 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
207 state->num_pending++;
211 if (state->num_pending == 0) {
216 /* we need to wait for the replies */
219 /* need to keep the control structure around */
220 talloc_steal(state, c);
222 /* but we won't wait forever */
223 event_add_timed(ctdb->ev, state,
224 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
225 ctdb_persistent_store_timeout, state);
231 struct ctdb_persistent_write_state {
232 struct ctdb_db_context *ctdb_db;
233 struct ctdb_marshall_buffer *m;
234 struct ctdb_req_control *c;
239 called from a child process to write the data
241 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
244 struct ctdb_rec_data *rec = NULL;
245 struct ctdb_marshall_buffer *m = state->m;
247 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
249 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
250 state->ctdb_db->db_id));
254 for (i=0;i<m->count;i++) {
255 struct ctdb_ltdb_header oldheader;
256 struct ctdb_ltdb_header header;
257 TDB_DATA key, data, olddata;
258 TALLOC_CTX *tmp_ctx = talloc_new(state);
260 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
263 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
264 i, state->ctdb_db->db_id));
265 talloc_free(tmp_ctx);
269 /* fetch the old header and ensure the rsn is less than the new rsn */
270 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
272 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
273 state->ctdb_db->db_id));
274 talloc_free(tmp_ctx);
278 if (oldheader.rsn >= header.rsn &&
279 (olddata.dsize != data.dsize ||
280 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
281 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
282 state->ctdb_db->db_id,
283 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
284 talloc_free(tmp_ctx);
288 talloc_free(tmp_ctx);
290 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
292 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
293 state->ctdb_db->db_id));
298 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
300 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
301 state->ctdb_db->db_id));
308 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
314 called when we the child has completed the persistent write
317 static void ctdb_persistent_write_callback(int status, void *private_data)
319 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
320 struct ctdb_persistent_write_state);
323 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
329 called if our lockwait child times out
331 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
332 struct timeval t, void *private_data)
334 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
335 struct ctdb_persistent_write_state);
336 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
340 struct childwrite_handle {
341 struct ctdb_context *ctdb;
342 struct ctdb_db_context *ctdb_db;
343 struct fd_event *fde;
347 void (*callback)(int, void *);
348 struct timeval start_time;
351 static int childwrite_destructor(struct childwrite_handle *h)
353 h->ctdb->statistics.pending_childwrite_calls--;
354 kill(h->child, SIGKILL);
358 /* called when the child process has finished writing the record to the
361 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
362 uint16_t flags, void *private_data)
364 struct childwrite_handle *h = talloc_get_type(private_data,
365 struct childwrite_handle);
366 void *p = h->private_data;
367 void (*callback)(int, void *) = h->callback;
368 pid_t child = h->child;
369 TALLOC_CTX *tmp_ctx = talloc_new(ev);
373 ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
374 h->ctdb->statistics.pending_childwrite_calls--;
376 /* the handle needs to go away when the context is gone - when
377 the handle goes away this implicitly closes the pipe, which
379 talloc_steal(tmp_ctx, h);
381 talloc_set_destructor(h, NULL);
383 ret = read(h->fd[0], &c, 1);
385 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
391 kill(child, SIGKILL);
392 talloc_free(tmp_ctx);
395 /* this creates a child process which will take out a tdb transaction
396 and write the record to the database.
398 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
399 void (*callback)(int, void *private_data),
400 struct ctdb_persistent_write_state *state)
402 struct childwrite_handle *result;
404 pid_t parent = getpid();
406 ctdb_db->ctdb->statistics.childwrite_calls++;
407 ctdb_db->ctdb->statistics.pending_childwrite_calls++;
409 if (!(result = talloc_zero(state, struct childwrite_handle))) {
410 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
414 ret = pipe(result->fd);
418 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
422 result->child = fork();
424 if (result->child == (pid_t)-1) {
425 close(result->fd[0]);
426 close(result->fd[1]);
428 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
432 result->callback = callback;
433 result->private_data = state;
434 result->ctdb = ctdb_db->ctdb;
435 result->ctdb_db = ctdb_db;
437 if (result->child == 0) {
440 close(result->fd[0]);
441 ret = ctdb_persistent_store(state);
443 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
447 write(result->fd[1], &c, 1);
449 /* make sure we die when our parent dies */
450 while (kill(parent, 0) == 0 || errno != ESRCH) {
456 close(result->fd[1]);
457 set_close_on_exec(result->fd[0]);
459 talloc_set_destructor(result, childwrite_destructor);
461 DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
463 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
464 EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
466 if (result->fde == NULL) {
468 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
472 result->start_time = timeval_current();
478 update a record on this node if the new record has a higher rsn than the
481 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
482 struct ctdb_req_control *c, TDB_DATA recdata,
485 struct ctdb_db_context *ctdb_db;
486 struct ctdb_persistent_write_state *state;
487 struct childwrite_handle *handle;
488 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
490 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
491 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
495 ctdb_db = find_ctdb_db(ctdb, m->db_id);
496 if (ctdb_db == NULL) {
497 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
501 state = talloc(ctdb, struct ctdb_persistent_write_state);
502 CTDB_NO_MEMORY(ctdb, state);
504 state->ctdb_db = ctdb_db;
508 /* create a child process to take out a transaction and
511 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
512 if (handle == NULL) {
513 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
518 /* we need to wait for the replies */
521 /* need to keep the control structure around */
522 talloc_steal(state, c);
524 /* but we won't wait forever */
525 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
526 ctdb_persistent_lock_timeout, state);
533 called when a client has finished a local commit in a transaction to
534 a persistent database
536 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
537 struct ctdb_req_control *c)
539 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
540 struct ctdb_db_context *ctdb_db;
542 ctdb_db = find_ctdb_db(ctdb, client->db_id);
543 if (ctdb_db == NULL) {
544 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
545 "Unknown database 0x%08x\n", client->db_id));
548 if (!ctdb_db->transaction_active) {
549 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
550 "Database 0x%08x has no transaction commit "
551 "started\n", client->db_id));
555 ctdb_db->transaction_active = false;
558 if (client->num_persistent_updates == 0) {
559 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
560 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
561 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
564 client->num_persistent_updates--;
570 called when a client gets an error committing its database
571 during a transaction commit
573 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
574 struct ctdb_req_control *c)
576 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
577 struct ctdb_db_context *ctdb_db;
579 ctdb_db = find_ctdb_db(ctdb, client->db_id);
580 if (ctdb_db == NULL) {
581 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
582 "Unknown database 0x%08x\n", client->db_id));
585 if (!ctdb_db->transaction_active) {
586 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
587 "Database 0x%08x has no transaction commit "
588 "started\n", client->db_id));
592 ctdb_db->transaction_active = false;
595 if (client->num_persistent_updates == 0) {
596 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
598 client->num_persistent_updates--;
601 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
602 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
608 * Tell whether a transaction is active on this node on the give DB.
610 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
611 struct ctdb_req_control *c,
614 struct ctdb_db_context *ctdb_db;
615 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
617 ctdb_db = find_ctdb_db(ctdb, db_id);
619 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
623 if (client->db_id == db_id) {
627 if (ctdb_db->transaction_active) {
635 backwards compatibility:
637 start a persistent store operation. passing both the key, header and
638 data to the daemon. If the client disconnects before it has issued
639 a persistent_update call to the daemon we trigger a full recovery
640 to ensure the databases are brought back in sync.
641 for now we ignore the recdata that the client has passed to us.
643 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
644 struct ctdb_req_control *c,
647 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
649 if (client == NULL) {
650 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
654 client->num_persistent_updates++;
660 backwards compatibility:
662 called to tell ctdbd that it is no longer doing a persistent update
664 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
665 struct ctdb_req_control *c,
668 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
670 if (client == NULL) {
671 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
675 if (client->num_persistent_updates > 0) {
676 client->num_persistent_updates--;
684 backwards compatibility:
686 single record varient of ctdb_control_trans2_commit for older clients
688 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
689 struct ctdb_req_control *c,
690 TDB_DATA recdata, bool *async_reply)
692 struct ctdb_marshall_buffer *m;
693 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
696 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
697 rec->keylen + rec->datalen) {
698 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
702 key.dptr = &rec->data[0];
703 key.dsize = rec->keylen;
704 data.dptr = &rec->data[rec->keylen];
705 data.dsize = rec->datalen;
707 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
708 CTDB_NO_MEMORY(ctdb, m);
710 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);