vacuum: skip adding records to list of records to send to lmaster on lmaster
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
32         struct ctdb_client *client; /* used by trans3_commit */
33         struct ctdb_req_control *c;
34         const char *errormsg;
35         uint32_t num_pending;
36         int32_t status;
37         uint32_t num_failed, num_sent;
38 };
39
40 /*
41   1) all nodes fail, and all nodes reply
42   2) some nodes fail, all nodes reply
43   3) some nodes timeout
44   4) all nodes succeed
45  */
46
47 /*
48   called when a node has acknowledged a ctdb_control_update_record call
49  */
50 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
51                                      int32_t status, TDB_DATA data, 
52                                      const char *errormsg,
53                                      void *private_data)
54 {
55         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
56                                                               struct ctdb_persistent_state);
57         enum ctdb_trans2_commit_error etype;
58
59         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
60                 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
61                                    "during recovery\n"));
62                 return;
63         }
64
65         if (status != 0) {
66                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
67                          status, errormsg?errormsg:"no error message given"));
68                 state->status = status;
69                 state->errormsg = errormsg;
70                 state->num_failed++;
71
72                 /*
73                  * If a node failed to complete the update_record control,
74                  * then either a recovery is already running or something
75                  * bad is going on. So trigger a recovery and let the
76                  * recovery finish the transaction, sending back the reply
77                  * for the trans3_commit control to the client.
78                  */
79                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
80                 return;
81         }
82
83         state->num_pending--;
84
85         if (state->num_pending != 0) {
86                 return;
87         }
88
89         if (state->num_failed == state->num_sent) {
90                 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
91         } else if (state->num_failed != 0) {
92                 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
93         } else {
94                 etype = CTDB_TRANS2_COMMIT_SUCCESS;
95         }
96
97         ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
98         talloc_free(state);
99 }
100
101 /*
102   called if persistent store times out
103  */
104 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
105                                          struct timeval t, void *private_data)
106 {
107         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
108
109         if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
110                 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
111                                    "timeout during recovery\n"));
112                 return;
113         }
114
115         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
116                                    "timeout in ctdb_persistent_state");
117
118         talloc_free(state);
119 }
120
121 /**
122  * Finish pending trans3 commit controls, i.e. send
123  * reply to the client. This is called by the end-recovery
124  * control to fix the situation when a recovery interrupts
125  * the usual porgress of a transaction.
126  */
127 void ctdb_persistent_finish_trans3_commits(struct ctdb_context *ctdb)
128 {
129         struct ctdb_db_context *ctdb_db;
130
131         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
132                 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
133                                    "timeout during recovery\n"));
134                 return;
135         }
136
137         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
138                 struct ctdb_persistent_state *state;
139
140                 if (ctdb_db->persistent_state == NULL) {
141                         continue;
142                 }
143
144                 state = ctdb_db->persistent_state;
145
146                 ctdb_request_control_reply(ctdb, state->c, NULL,
147                                            CTDB_TRANS2_COMMIT_SOMEFAIL,
148                                            "trans3 commit ended by recovery");
149
150                 /* The destructor sets ctdb_db->persistent_state to NULL. */
151                 talloc_free(state);
152         }
153 }
154
155 /*
156   store a set of persistent records - called from a ctdb client when it has updated
157   some records in a persistent database. The client will have the record
158   locked for the duration of this call. The client is the dmaster when 
159   this call is made
160  */
161 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
162                                    struct ctdb_req_control *c, 
163                                    TDB_DATA recdata, bool *async_reply)
164 {
165         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
166         struct ctdb_persistent_state *state;
167         int i;
168         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
169         struct ctdb_db_context *ctdb_db;
170
171         ctdb_db = find_ctdb_db(ctdb, m->db_id);
172         if (ctdb_db == NULL) {
173                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
174                                  "Unknown database db_id[0x%08x]\n", m->db_id));
175                 return -1;
176         }
177
178         if (client == NULL) {
179                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
180                 return -1;
181         }
182
183         if (ctdb_db->unhealthy_reason) {
184                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
185                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
186                 return -1;
187         }
188
189         /* handling num_persistent_updates is a bit strange - 
190            there are 3 cases
191              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
192                 They don't expect num_persistent_updates to be used at all
193
194              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
195                 this commit to then decrement it
196
197              3) new clients which use TRANS2 commit functions, and
198                 expect this function to increment the counter, and
199                 then have it decremented in ctdb_control_trans2_error
200                 or ctdb_control_trans2_finished
201         */
202         switch (c->opcode) {
203         case CTDB_CONTROL_PERSISTENT_STORE:
204                 if (ctdb_db->transaction_active) {
205                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
206                                           "transaction is active on database "
207                                           "db_id[0x%08x] - refusing persistent "
208                                          " store for client id[0x%08x]\n",
209                                           ctdb_db->db_id, client->client_id));
210                         return -1;
211                 }
212                 if (client->num_persistent_updates > 0) {
213                         client->num_persistent_updates--;
214                 }
215                 break;
216         case CTDB_CONTROL_TRANS2_COMMIT:
217                 if (ctdb_db->transaction_active) {
218                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
219                                          " already a transaction commit "
220                                          "active on db_id[0x%08x] - forbidding "
221                                          "client_id[0x%08x] to commit\n",
222                                          ctdb_db->db_id, client->client_id));
223                         return -1;
224                 }
225                 if (client->db_id != 0) {
226                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
227                                          "client-db_id[0x%08x] != 0 "
228                                          "(client_id[0x%08x])\n",
229                                          client->db_id, client->client_id));
230                         return -1;
231                 }
232                 client->num_persistent_updates++;
233                 ctdb_db->transaction_active = true;
234                 client->db_id = m->db_id;
235                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
236                                   " commit transaction on db id[0x%08x]\n",
237                                   client->client_id, client->db_id));
238                 break;
239         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
240                 /* already updated from the first commit */
241                 if (client->db_id != m->db_id) {
242                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
243                                          "retry: client-db_id[0x%08x] != "
244                                          "db_id[0x%08x] (client_id[0x%08x])\n",
245                                          client->db_id,
246                                          m->db_id, client->client_id));
247                         return -1;
248                 }
249                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
250                                     "transaction commit retry on "
251                                     "db_id[0x%08x]\n",
252                                     client->client_id, client->db_id));
253                 break;
254         }
255
256         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
257                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
258                 return -1;
259         }
260
261         state = talloc_zero(ctdb, struct ctdb_persistent_state);
262         CTDB_NO_MEMORY(ctdb, state);
263
264         state->ctdb = ctdb;
265         state->c    = c;
266
267         for (i=0;i<ctdb->vnn_map->size;i++) {
268                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
269                 int ret;
270
271                 /* only send to active nodes */
272                 if (node->flags & NODE_FLAGS_INACTIVE) {
273                         continue;
274                 }
275
276                 /* don't send to ourselves */
277                 if (node->pnn == ctdb->pnn) {
278                         continue;
279                 }
280                 
281                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
282                                                c->client_id, 0, recdata, 
283                                                ctdb_persistent_callback, state);
284                 if (ret == -1) {
285                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
286                         talloc_free(state);
287                         return -1;
288                 }
289
290                 state->num_pending++;
291                 state->num_sent++;
292         }
293
294         if (state->num_pending == 0) {
295                 talloc_free(state);
296                 return 0;
297         }
298         
299         /* we need to wait for the replies */
300         *async_reply = true;
301
302         /* need to keep the control structure around */
303         talloc_steal(state, c);
304
305         /* but we won't wait forever */
306         event_add_timed(ctdb->ev, state, 
307                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
308                         ctdb_persistent_store_timeout, state);
309
310         return 0;
311 }
312
313 static int ctdb_persistent_state_destructor(struct ctdb_persistent_state *state)
314 {
315         if (state->client != NULL) {
316                 state->client->db_id = 0;
317         }
318
319         if (state->ctdb_db != NULL) {
320                 state->ctdb_db->persistent_state = NULL;
321         }
322
323         return 0;
324 }
325
326 /*
327  * Store a set of persistent records.
328  * This is used to roll out a transaction to all nodes.
329  */
330 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
331                                    struct ctdb_req_control *c,
332                                    TDB_DATA recdata, bool *async_reply)
333 {
334         struct ctdb_client *client;
335         struct ctdb_persistent_state *state;
336         int i;
337         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
338         struct ctdb_db_context *ctdb_db;
339
340         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
341                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
342                 return -1;
343         }
344
345         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
346         if (client == NULL) {
347                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
348                                  "to a client. Returning error\n"));
349                 return -1;
350         }
351
352         if (client->db_id != 0) {
353                 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans3_commit: "
354                                  "client-db_id[0x%08x] != 0 "
355                                  "(client_id[0x%08x]): trans3_commit active?\n",
356                                  client->db_id, client->client_id));
357                 return -1;
358         }
359
360         ctdb_db = find_ctdb_db(ctdb, m->db_id);
361         if (ctdb_db == NULL) {
362                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
363                                  "Unknown database db_id[0x%08x]\n", m->db_id));
364                 return -1;
365         }
366
367         if (ctdb_db->persistent_state != NULL) {
368                 DEBUG(DEBUG_ERR, (__location__ " Error: "
369                                   "ctdb_control_trans3_commit "
370                                   "called while a transaction commit is "
371                                   "active. db_id[0x%08x]\n", m->db_id));
372                 return -1;
373         }
374
375         ctdb_db->persistent_state = talloc_zero(ctdb_db,
376                                                 struct ctdb_persistent_state);
377         CTDB_NO_MEMORY(ctdb, ctdb_db->persistent_state);
378
379         client->db_id = m->db_id;
380
381         state = ctdb_db->persistent_state;
382         state->ctdb = ctdb;
383         state->ctdb_db = ctdb_db;
384         state->c    = c;
385         state->client = client;
386
387         talloc_set_destructor(state, ctdb_persistent_state_destructor);
388
389         for (i = 0; i < ctdb->vnn_map->size; i++) {
390                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
391                 int ret;
392
393                 /* only send to active nodes */
394                 if (node->flags & NODE_FLAGS_INACTIVE) {
395                         continue;
396                 }
397
398                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
399                                                CTDB_CONTROL_UPDATE_RECORD,
400                                                c->client_id, 0, recdata,
401                                                ctdb_persistent_callback,
402                                                state);
403                 if (ret == -1) {
404                         DEBUG(DEBUG_ERR,("Unable to send "
405                                          "CTDB_CONTROL_UPDATE_RECORD "
406                                          "to pnn %u\n", node->pnn));
407                         talloc_free(state);
408                         return -1;
409                 }
410
411                 state->num_pending++;
412                 state->num_sent++;
413         }
414
415         if (state->num_pending == 0) {
416                 talloc_free(state);
417                 return 0;
418         }
419
420         /* we need to wait for the replies */
421         *async_reply = true;
422
423         /* need to keep the control structure around */
424         talloc_steal(state, c);
425
426         /* but we won't wait forever */
427         event_add_timed(ctdb->ev, state,
428                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
429                         ctdb_persistent_store_timeout, state);
430
431         return 0;
432 }
433
434
435 struct ctdb_persistent_write_state {
436         struct ctdb_db_context *ctdb_db;
437         struct ctdb_marshall_buffer *m;
438         struct ctdb_req_control *c;
439 };
440
441
442 /*
443   called from a child process to write the data
444  */
445 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
446 {
447         int ret, i;
448         struct ctdb_rec_data *rec = NULL;
449         struct ctdb_marshall_buffer *m = state->m;
450
451         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
452         if (ret == -1) {
453                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
454                                  state->ctdb_db->db_id));
455                 return -1;
456         }
457
458         for (i=0;i<m->count;i++) {
459                 struct ctdb_ltdb_header oldheader;
460                 struct ctdb_ltdb_header header;
461                 TDB_DATA key, data, olddata;
462                 TALLOC_CTX *tmp_ctx = talloc_new(state);
463
464                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
465                 
466                 if (rec == NULL) {
467                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
468                                          i, state->ctdb_db->db_id));
469                         talloc_free(tmp_ctx);
470                         goto failed;                    
471                 }
472
473                 /* fetch the old header and ensure the rsn is less than the new rsn */
474                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
477                                          state->ctdb_db->db_id));
478                         talloc_free(tmp_ctx);
479                         goto failed;
480                 }
481
482                 if (oldheader.rsn >= header.rsn &&
483                     (olddata.dsize != data.dsize || 
484                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
485                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
486                                           state->ctdb_db->db_id, 
487                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
488                         talloc_free(tmp_ctx);
489                         goto failed;
490                 }
491
492                 talloc_free(tmp_ctx);
493
494                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
495                 if (ret != 0) {
496                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
497                                           state->ctdb_db->db_id));
498                         goto failed;
499                 }
500         }
501
502         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
503         if (ret == -1) {
504                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
505                                  state->ctdb_db->db_id));
506                 return -1;
507         }
508
509         return 0;
510         
511 failed:
512         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
513         return -1;
514 }
515
516
517 /*
518   called when we the child has completed the persistent write
519   on our behalf
520  */
521 static void ctdb_persistent_write_callback(int status, void *private_data)
522 {
523         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
524                                                                    struct ctdb_persistent_write_state);
525
526
527         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
528
529         talloc_free(state);
530 }
531
532 /*
533   called if our lockwait child times out
534  */
535 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
536                                          struct timeval t, void *private_data)
537 {
538         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
539                                                                    struct ctdb_persistent_write_state);
540         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
541         talloc_free(state);
542 }
543
544 struct childwrite_handle {
545         struct ctdb_context *ctdb;
546         struct ctdb_db_context *ctdb_db;
547         struct fd_event *fde;
548         int fd[2];
549         pid_t child;
550         void *private_data;
551         void (*callback)(int, void *);
552         struct timeval start_time;
553 };
554
555 static int childwrite_destructor(struct childwrite_handle *h)
556 {
557         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
558         kill(h->child, SIGKILL);
559         return 0;
560 }
561
562 /* called when the child process has finished writing the record to the
563    database
564 */
565 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
566                              uint16_t flags, void *private_data)
567 {
568         struct childwrite_handle *h = talloc_get_type(private_data, 
569                                                      struct childwrite_handle);
570         void *p = h->private_data;
571         void (*callback)(int, void *) = h->callback;
572         pid_t child = h->child;
573         TALLOC_CTX *tmp_ctx = talloc_new(ev);
574         int ret;
575         char c;
576
577         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
578         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
579
580         /* the handle needs to go away when the context is gone - when
581            the handle goes away this implicitly closes the pipe, which
582            kills the child */
583         talloc_steal(tmp_ctx, h);
584
585         talloc_set_destructor(h, NULL);
586
587         ret = read(h->fd[0], &c, 1);
588         if (ret < 1) {
589                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
590                 c = 1;
591         }
592
593         callback(c, p);
594
595         kill(child, SIGKILL);
596         talloc_free(tmp_ctx);
597 }
598
599 /* this creates a child process which will take out a tdb transaction
600    and write the record to the database.
601 */
602 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
603                                 void (*callback)(int, void *private_data),
604                                 struct ctdb_persistent_write_state *state)
605 {
606         struct childwrite_handle *result;
607         int ret;
608         pid_t parent = getpid();
609
610         CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
611         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
612
613         if (!(result = talloc_zero(state, struct childwrite_handle))) {
614                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
615                 return NULL;
616         }
617
618         ret = pipe(result->fd);
619
620         if (ret != 0) {
621                 talloc_free(result);
622                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
623                 return NULL;
624         }
625
626         result->child = ctdb_fork(ctdb_db->ctdb);
627
628         if (result->child == (pid_t)-1) {
629                 close(result->fd[0]);
630                 close(result->fd[1]);
631                 talloc_free(result);
632                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
633                 return NULL;
634         }
635
636         result->callback = callback;
637         result->private_data = state;
638         result->ctdb = ctdb_db->ctdb;
639         result->ctdb_db = ctdb_db;
640
641         if (result->child == 0) {
642                 char c = 0;
643
644                 close(result->fd[0]);
645                 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
646                 ret = ctdb_persistent_store(state);
647                 if (ret != 0) {
648                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
649                         c = 1;
650                 }
651
652                 write(result->fd[1], &c, 1);
653
654                 /* make sure we die when our parent dies */
655                 while (kill(parent, 0) == 0 || errno != ESRCH) {
656                         sleep(5);
657                 }
658                 _exit(0);
659         }
660
661         close(result->fd[1]);
662         set_close_on_exec(result->fd[0]);
663
664         talloc_set_destructor(result, childwrite_destructor);
665
666         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
667
668         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
669                                    EVENT_FD_READ, childwrite_handler,
670                                    (void *)result);
671         if (result->fde == NULL) {
672                 talloc_free(result);
673                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
674                 return NULL;
675         }
676         tevent_fd_set_auto_close(result->fde);
677
678         result->start_time = timeval_current();
679
680         return result;
681 }
682
683 /* 
684    update a record on this node if the new record has a higher rsn than the
685    current record
686  */
687 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
688                                    struct ctdb_req_control *c, TDB_DATA recdata, 
689                                    bool *async_reply)
690 {
691         struct ctdb_db_context *ctdb_db;
692         struct ctdb_persistent_write_state *state;
693         struct childwrite_handle *handle;
694         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
695
696         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
697                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
698                 return -1;
699         }
700
701         ctdb_db = find_ctdb_db(ctdb, m->db_id);
702         if (ctdb_db == NULL) {
703                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
704                 return -1;
705         }
706
707         if (ctdb_db->unhealthy_reason) {
708                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
709                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
710                 return -1;
711         }
712
713         state = talloc(ctdb, struct ctdb_persistent_write_state);
714         CTDB_NO_MEMORY(ctdb, state);
715
716         state->ctdb_db = ctdb_db;
717         state->c       = c;
718         state->m       = m;
719
720         /* create a child process to take out a transaction and 
721            write the data.
722         */
723         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
724         if (handle == NULL) {
725                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
726                 talloc_free(state);
727                 return -1;
728         }
729
730         /* we need to wait for the replies */
731         *async_reply = true;
732
733         /* need to keep the control structure around */
734         talloc_steal(state, c);
735
736         /* but we won't wait forever */
737         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
738                         ctdb_persistent_lock_timeout, state);
739
740         return 0;
741 }
742
743
744 /*
745   called when a client has finished a local commit in a transaction to 
746   a persistent database
747  */
748 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
749                                      struct ctdb_req_control *c)
750 {
751         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
752         struct ctdb_db_context *ctdb_db;
753
754         ctdb_db = find_ctdb_db(ctdb, client->db_id);
755         if (ctdb_db == NULL) {
756                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
757                                  "Unknown database 0x%08x\n", client->db_id));
758                 return -1;
759         }
760         if (!ctdb_db->transaction_active) {
761                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
762                                  "Database 0x%08x has no transaction commit "
763                                  "started\n", client->db_id));
764                 return -1;
765         }
766
767         ctdb_db->transaction_active = false;
768         client->db_id = 0;
769
770         if (client->num_persistent_updates == 0) {
771                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
772                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
773                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
774                 return -1;
775         }
776         client->num_persistent_updates--;
777
778         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
779                             "transaction commit db_id[0x%08x]\n",
780                             client->client_id, ctdb_db->db_id));
781
782         return 0;
783 }
784
785 /*
786   called when a client gets an error committing its database
787   during a transaction commit
788  */
789 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
790                                   struct ctdb_req_control *c)
791 {
792         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
793         struct ctdb_db_context *ctdb_db;
794
795         ctdb_db = find_ctdb_db(ctdb, client->db_id);
796         if (ctdb_db == NULL) {
797                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
798                                  "Unknown database 0x%08x\n", client->db_id));
799                 return -1;
800         }
801         if (!ctdb_db->transaction_active) {
802                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
803                                  "Database 0x%08x has no transaction commit "
804                                  "started\n", client->db_id));
805                 return -1;
806         }
807
808         ctdb_db->transaction_active = false;
809         client->db_id = 0;
810
811         if (client->num_persistent_updates == 0) {
812                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
813         } else {
814                 client->num_persistent_updates--;
815         }
816
817         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
818                          " db_id[0x%08x] - forcing recovery\n",
819                          ctdb_db->db_id));
820         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
821
822         return 0;
823 }
824
825 /**
826  * Tell whether a transaction is active on this node on the give DB.
827  */
828 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
829                                    struct ctdb_req_control *c,
830                                    uint32_t db_id)
831 {
832         struct ctdb_db_context *ctdb_db;
833         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
834
835         ctdb_db = find_ctdb_db(ctdb, db_id);
836         if (!ctdb_db) {
837                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
838                 return -1;
839         }
840
841         if (client->db_id == db_id) {
842                 return 0;
843         }
844
845         if (ctdb_db->transaction_active) {
846                 return 1;
847         } else {
848                 return 0;
849         }
850 }
851
852 /*
853   backwards compatibility:
854
855   start a persistent store operation. passing both the key, header and
856   data to the daemon. If the client disconnects before it has issued
857   a persistent_update call to the daemon we trigger a full recovery
858   to ensure the databases are brought back in sync.
859   for now we ignore the recdata that the client has passed to us.
860  */
861 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
862                                       struct ctdb_req_control *c,
863                                       TDB_DATA recdata)
864 {
865         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
866
867         if (client == NULL) {
868                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
869                 return -1;
870         }
871
872         client->num_persistent_updates++;
873
874         return 0;
875 }
876
877 /* 
878   backwards compatibility:
879
880   called to tell ctdbd that it is no longer doing a persistent update 
881 */
882 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
883                                               struct ctdb_req_control *c,
884                                               TDB_DATA recdata)
885 {
886         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
887
888         if (client == NULL) {
889                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
890                 return -1;
891         }
892
893         if (client->num_persistent_updates > 0) {
894                 client->num_persistent_updates--;
895         }
896
897         return 0;
898 }
899
900
901 /*
902   backwards compatibility:
903
904   single record varient of ctdb_control_trans2_commit for older clients
905  */
906 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
907                                       struct ctdb_req_control *c, 
908                                       TDB_DATA recdata, bool *async_reply)
909 {
910         struct ctdb_marshall_buffer *m;
911         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
912         TDB_DATA key, data;
913
914         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
915             rec->keylen + rec->datalen) {
916                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
917                 return -1;
918         }
919
920         key.dptr = &rec->data[0];
921         key.dsize = rec->keylen;
922         data.dptr = &rec->data[rec->keylen];
923         data.dsize = rec->datalen;
924
925         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
926         CTDB_NO_MEMORY(ctdb, m);
927
928         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
929 }
930
931 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
932                                   uint32_t db_id,
933                                   uint64_t *seqnum)
934 {
935         int32_t ret;
936         struct ctdb_db_context *ctdb_db;
937         const char *keyname = CTDB_DB_SEQNUM_KEY;
938         TDB_DATA key;
939         TDB_DATA data;
940         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
941
942         ctdb_db = find_ctdb_db(ctdb, db_id);
943         if (!ctdb_db) {
944                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
945                 ret = -1;
946                 goto done;
947         }
948
949         key.dptr = (uint8_t *)discard_const(keyname);
950         key.dsize = strlen(keyname) + 1;
951
952         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
953         if (ret != 0) {
954                 goto done;
955         }
956
957         if (data.dsize != sizeof(uint64_t)) {
958                 *seqnum = 0;
959                 goto done;
960         }
961
962         *seqnum = *(uint64_t *)data.dptr;
963
964 done:
965         talloc_free(mem_ctx);
966         return ret;
967 }
968
969 /**
970  * Get the sequence number of a persistent database.
971  */
972 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
973                                    TDB_DATA indata,
974                                    TDB_DATA *outdata)
975 {
976         uint32_t db_id;
977         int32_t ret;
978         uint64_t seqnum;
979
980         db_id = *(uint32_t *)indata.dptr;
981         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
982         if (ret != 0) {
983                 goto done;
984         }
985
986         outdata->dsize = sizeof(uint64_t);
987         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
988         if (outdata->dptr == NULL) {
989                 ret = -1;
990                 goto done;
991         }
992
993         *(outdata->dptr) = seqnum;
994
995 done:
996         return ret;
997 }