afefeaf77f09c11f94310534ad61507c8b07b527
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
32         struct ctdb_req_control *c;
33         const char *errormsg;
34         uint32_t num_pending;
35         int32_t status;
36         uint32_t num_failed, num_sent;
37 };
38
39 /*
40   1) all nodes fail, and all nodes reply
41   2) some nodes fail, all nodes reply
42   3) some nodes timeout
43   4) all nodes succeed
44  */
45
46 /*
47   called when a node has acknowledged a ctdb_control_update_record call
48  */
49 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
50                                      int32_t status, TDB_DATA data, 
51                                      const char *errormsg,
52                                      void *private_data)
53 {
54         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
55                                                               struct ctdb_persistent_state);
56         enum ctdb_trans2_commit_error etype;
57
58         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
59                 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
60                                    "during recovery\n"));
61                 return;
62         }
63
64         if (status != 0) {
65                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
66                          status, errormsg?errormsg:"no error message given"));
67                 state->status = status;
68                 state->errormsg = errormsg;
69                 state->num_failed++;
70
71                 /*
72                  * If a node failed to complete the update_record control,
73                  * then either a recovery is already running or something
74                  * bad is going on. So trigger a recovery and let the
75                  * recovery finish the transaction, sending back the reply
76                  * for the trans3_commit control to the client.
77                  */
78                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
79                 return;
80         }
81
82         state->num_pending--;
83
84         if (state->num_pending != 0) {
85                 return;
86         }
87
88         if (state->num_failed == state->num_sent) {
89                 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
90         } else if (state->num_failed != 0) {
91                 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
92         } else {
93                 etype = CTDB_TRANS2_COMMIT_SUCCESS;
94         }
95
96         ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
97         talloc_free(state);
98 }
99
100 /*
101   called if persistent store times out
102  */
103 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
104                                          struct timeval t, void *private_data)
105 {
106         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
107
108         if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109                 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
110                                    "timeout during recovery\n"));
111                 return;
112         }
113
114         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
115                                    "timeout in ctdb_persistent_state");
116
117         talloc_free(state);
118 }
119
120 /*
121   store a set of persistent records - called from a ctdb client when it has updated
122   some records in a persistent database. The client will have the record
123   locked for the duration of this call. The client is the dmaster when 
124   this call is made
125  */
126 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
127                                    struct ctdb_req_control *c, 
128                                    TDB_DATA recdata, bool *async_reply)
129 {
130         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
131         struct ctdb_persistent_state *state;
132         int i;
133         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
134         struct ctdb_db_context *ctdb_db;
135
136         ctdb_db = find_ctdb_db(ctdb, m->db_id);
137         if (ctdb_db == NULL) {
138                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
139                                  "Unknown database db_id[0x%08x]\n", m->db_id));
140                 return -1;
141         }
142
143         if (client == NULL) {
144                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
145                 return -1;
146         }
147
148         if (ctdb_db->unhealthy_reason) {
149                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
150                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
151                 return -1;
152         }
153
154         /* handling num_persistent_updates is a bit strange - 
155            there are 3 cases
156              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
157                 They don't expect num_persistent_updates to be used at all
158
159              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
160                 this commit to then decrement it
161
162              3) new clients which use TRANS2 commit functions, and
163                 expect this function to increment the counter, and
164                 then have it decremented in ctdb_control_trans2_error
165                 or ctdb_control_trans2_finished
166         */
167         switch (c->opcode) {
168         case CTDB_CONTROL_PERSISTENT_STORE:
169                 if (ctdb_db->transaction_active) {
170                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
171                                           "transaction is active on database "
172                                           "db_id[0x%08x] - refusing persistent "
173                                          " store for client id[0x%08x]\n",
174                                           ctdb_db->db_id, client->client_id));
175                         return -1;
176                 }
177                 if (client->num_persistent_updates > 0) {
178                         client->num_persistent_updates--;
179                 }
180                 break;
181         case CTDB_CONTROL_TRANS2_COMMIT:
182                 if (ctdb_db->transaction_active) {
183                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
184                                          " already a transaction commit "
185                                          "active on db_id[0x%08x] - forbidding "
186                                          "client_id[0x%08x] to commit\n",
187                                          ctdb_db->db_id, client->client_id));
188                         return -1;
189                 }
190                 if (client->db_id != 0) {
191                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
192                                          "client-db_id[0x%08x] != 0 "
193                                          "(client_id[0x%08x])\n",
194                                          client->db_id, client->client_id));
195                         return -1;
196                 }
197                 client->num_persistent_updates++;
198                 ctdb_db->transaction_active = true;
199                 client->db_id = m->db_id;
200                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
201                                   " commit transaction on db id[0x%08x]\n",
202                                   client->client_id, client->db_id));
203                 break;
204         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
205                 /* already updated from the first commit */
206                 if (client->db_id != m->db_id) {
207                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
208                                          "retry: client-db_id[0x%08x] != "
209                                          "db_id[0x%08x] (client_id[0x%08x])\n",
210                                          client->db_id,
211                                          m->db_id, client->client_id));
212                         return -1;
213                 }
214                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
215                                     "transaction commit retry on "
216                                     "db_id[0x%08x]\n",
217                                     client->client_id, client->db_id));
218                 break;
219         }
220
221         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
222                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
223                 return -1;
224         }
225
226         state = talloc_zero(ctdb, struct ctdb_persistent_state);
227         CTDB_NO_MEMORY(ctdb, state);
228
229         state->ctdb = ctdb;
230         state->c    = c;
231
232         for (i=0;i<ctdb->vnn_map->size;i++) {
233                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
234                 int ret;
235
236                 /* only send to active nodes */
237                 if (node->flags & NODE_FLAGS_INACTIVE) {
238                         continue;
239                 }
240
241                 /* don't send to ourselves */
242                 if (node->pnn == ctdb->pnn) {
243                         continue;
244                 }
245                 
246                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
247                                                c->client_id, 0, recdata, 
248                                                ctdb_persistent_callback, state);
249                 if (ret == -1) {
250                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
251                         talloc_free(state);
252                         return -1;
253                 }
254
255                 state->num_pending++;
256                 state->num_sent++;
257         }
258
259         if (state->num_pending == 0) {
260                 talloc_free(state);
261                 return 0;
262         }
263         
264         /* we need to wait for the replies */
265         *async_reply = true;
266
267         /* need to keep the control structure around */
268         talloc_steal(state, c);
269
270         /* but we won't wait forever */
271         event_add_timed(ctdb->ev, state, 
272                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
273                         ctdb_persistent_store_timeout, state);
274
275         return 0;
276 }
277
278
279 /*
280  * Store a set of persistent records.
281  * This is used to roll out a transaction to all nodes.
282  */
283 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
284                                    struct ctdb_req_control *c,
285                                    TDB_DATA recdata, bool *async_reply)
286 {
287         struct ctdb_client *client;
288         struct ctdb_persistent_state *state;
289         int i;
290         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
291         struct ctdb_db_context *ctdb_db;
292
293         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
294                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
295                 return -1;
296         }
297
298         ctdb_db = find_ctdb_db(ctdb, m->db_id);
299         if (ctdb_db == NULL) {
300                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
301                                  "Unknown database db_id[0x%08x]\n", m->db_id));
302                 return -1;
303         }
304
305         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
306         if (client == NULL) {
307                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
308                                  "to a client. Returning error\n"));
309                 return -1;
310         }
311
312         state = talloc_zero(ctdb, struct ctdb_persistent_state);
313         CTDB_NO_MEMORY(ctdb, state);
314
315         state->ctdb = ctdb;
316         state->c    = c;
317
318         for (i = 0; i < ctdb->vnn_map->size; i++) {
319                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
320                 int ret;
321
322                 /* only send to active nodes */
323                 if (node->flags & NODE_FLAGS_INACTIVE) {
324                         continue;
325                 }
326
327                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
328                                                CTDB_CONTROL_UPDATE_RECORD,
329                                                c->client_id, 0, recdata,
330                                                ctdb_persistent_callback,
331                                                state);
332                 if (ret == -1) {
333                         DEBUG(DEBUG_ERR,("Unable to send "
334                                          "CTDB_CONTROL_UPDATE_RECORD "
335                                          "to pnn %u\n", node->pnn));
336                         talloc_free(state);
337                         return -1;
338                 }
339
340                 state->num_pending++;
341                 state->num_sent++;
342         }
343
344         if (state->num_pending == 0) {
345                 talloc_free(state);
346                 return 0;
347         }
348
349         /* we need to wait for the replies */
350         *async_reply = true;
351
352         /* need to keep the control structure around */
353         talloc_steal(state, c);
354
355         /* but we won't wait forever */
356         event_add_timed(ctdb->ev, state,
357                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
358                         ctdb_persistent_store_timeout, state);
359
360         return 0;
361 }
362
363
364 struct ctdb_persistent_write_state {
365         struct ctdb_db_context *ctdb_db;
366         struct ctdb_marshall_buffer *m;
367         struct ctdb_req_control *c;
368 };
369
370
371 /*
372   called from a child process to write the data
373  */
374 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
375 {
376         int ret, i;
377         struct ctdb_rec_data *rec = NULL;
378         struct ctdb_marshall_buffer *m = state->m;
379
380         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
381         if (ret == -1) {
382                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
383                                  state->ctdb_db->db_id));
384                 return -1;
385         }
386
387         for (i=0;i<m->count;i++) {
388                 struct ctdb_ltdb_header oldheader;
389                 struct ctdb_ltdb_header header;
390                 TDB_DATA key, data, olddata;
391                 TALLOC_CTX *tmp_ctx = talloc_new(state);
392
393                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
394                 
395                 if (rec == NULL) {
396                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
397                                          i, state->ctdb_db->db_id));
398                         talloc_free(tmp_ctx);
399                         goto failed;                    
400                 }
401
402                 /* fetch the old header and ensure the rsn is less than the new rsn */
403                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
404                 if (ret != 0) {
405                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
406                                          state->ctdb_db->db_id));
407                         talloc_free(tmp_ctx);
408                         goto failed;
409                 }
410
411                 if (oldheader.rsn >= header.rsn &&
412                     (olddata.dsize != data.dsize || 
413                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
414                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
415                                           state->ctdb_db->db_id, 
416                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
417                         talloc_free(tmp_ctx);
418                         goto failed;
419                 }
420
421                 talloc_free(tmp_ctx);
422
423                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
424                 if (ret != 0) {
425                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
426                                           state->ctdb_db->db_id));
427                         goto failed;
428                 }
429         }
430
431         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
432         if (ret == -1) {
433                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
434                                  state->ctdb_db->db_id));
435                 return -1;
436         }
437
438         return 0;
439         
440 failed:
441         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
442         return -1;
443 }
444
445
446 /*
447   called when we the child has completed the persistent write
448   on our behalf
449  */
450 static void ctdb_persistent_write_callback(int status, void *private_data)
451 {
452         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
453                                                                    struct ctdb_persistent_write_state);
454
455
456         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
457
458         talloc_free(state);
459 }
460
461 /*
462   called if our lockwait child times out
463  */
464 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
465                                          struct timeval t, void *private_data)
466 {
467         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
468                                                                    struct ctdb_persistent_write_state);
469         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
470         talloc_free(state);
471 }
472
473 struct childwrite_handle {
474         struct ctdb_context *ctdb;
475         struct ctdb_db_context *ctdb_db;
476         struct fd_event *fde;
477         int fd[2];
478         pid_t child;
479         void *private_data;
480         void (*callback)(int, void *);
481         struct timeval start_time;
482 };
483
484 static int childwrite_destructor(struct childwrite_handle *h)
485 {
486         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
487         kill(h->child, SIGKILL);
488         return 0;
489 }
490
491 /* called when the child process has finished writing the record to the
492    database
493 */
494 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
495                              uint16_t flags, void *private_data)
496 {
497         struct childwrite_handle *h = talloc_get_type(private_data, 
498                                                      struct childwrite_handle);
499         void *p = h->private_data;
500         void (*callback)(int, void *) = h->callback;
501         pid_t child = h->child;
502         TALLOC_CTX *tmp_ctx = talloc_new(ev);
503         int ret;
504         char c;
505
506         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
507         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
508
509         /* the handle needs to go away when the context is gone - when
510            the handle goes away this implicitly closes the pipe, which
511            kills the child */
512         talloc_steal(tmp_ctx, h);
513
514         talloc_set_destructor(h, NULL);
515
516         ret = read(h->fd[0], &c, 1);
517         if (ret < 1) {
518                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
519                 c = 1;
520         }
521
522         callback(c, p);
523
524         kill(child, SIGKILL);
525         talloc_free(tmp_ctx);
526 }
527
528 /* this creates a child process which will take out a tdb transaction
529    and write the record to the database.
530 */
531 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
532                                 void (*callback)(int, void *private_data),
533                                 struct ctdb_persistent_write_state *state)
534 {
535         struct childwrite_handle *result;
536         int ret;
537         pid_t parent = getpid();
538
539         CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
540         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
541
542         if (!(result = talloc_zero(state, struct childwrite_handle))) {
543                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
544                 return NULL;
545         }
546
547         ret = pipe(result->fd);
548
549         if (ret != 0) {
550                 talloc_free(result);
551                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
552                 return NULL;
553         }
554
555         result->child = ctdb_fork(ctdb_db->ctdb);
556
557         if (result->child == (pid_t)-1) {
558                 close(result->fd[0]);
559                 close(result->fd[1]);
560                 talloc_free(result);
561                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
562                 return NULL;
563         }
564
565         result->callback = callback;
566         result->private_data = state;
567         result->ctdb = ctdb_db->ctdb;
568         result->ctdb_db = ctdb_db;
569
570         if (result->child == 0) {
571                 char c = 0;
572
573                 close(result->fd[0]);
574                 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
575                 ret = ctdb_persistent_store(state);
576                 if (ret != 0) {
577                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
578                         c = 1;
579                 }
580
581                 write(result->fd[1], &c, 1);
582
583                 /* make sure we die when our parent dies */
584                 while (kill(parent, 0) == 0 || errno != ESRCH) {
585                         sleep(5);
586                 }
587                 _exit(0);
588         }
589
590         close(result->fd[1]);
591         set_close_on_exec(result->fd[0]);
592
593         talloc_set_destructor(result, childwrite_destructor);
594
595         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
596
597         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
598                                    EVENT_FD_READ, childwrite_handler,
599                                    (void *)result);
600         if (result->fde == NULL) {
601                 talloc_free(result);
602                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
603                 return NULL;
604         }
605         tevent_fd_set_auto_close(result->fde);
606
607         result->start_time = timeval_current();
608
609         return result;
610 }
611
612 /* 
613    update a record on this node if the new record has a higher rsn than the
614    current record
615  */
616 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
617                                    struct ctdb_req_control *c, TDB_DATA recdata, 
618                                    bool *async_reply)
619 {
620         struct ctdb_db_context *ctdb_db;
621         struct ctdb_persistent_write_state *state;
622         struct childwrite_handle *handle;
623         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
624
625         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
626                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
627                 return -1;
628         }
629
630         ctdb_db = find_ctdb_db(ctdb, m->db_id);
631         if (ctdb_db == NULL) {
632                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
633                 return -1;
634         }
635
636         if (ctdb_db->unhealthy_reason) {
637                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
638                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
639                 return -1;
640         }
641
642         state = talloc(ctdb, struct ctdb_persistent_write_state);
643         CTDB_NO_MEMORY(ctdb, state);
644
645         state->ctdb_db = ctdb_db;
646         state->c       = c;
647         state->m       = m;
648
649         /* create a child process to take out a transaction and 
650            write the data.
651         */
652         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
653         if (handle == NULL) {
654                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
655                 talloc_free(state);
656                 return -1;
657         }
658
659         /* we need to wait for the replies */
660         *async_reply = true;
661
662         /* need to keep the control structure around */
663         talloc_steal(state, c);
664
665         /* but we won't wait forever */
666         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
667                         ctdb_persistent_lock_timeout, state);
668
669         return 0;
670 }
671
672
673 /*
674   called when a client has finished a local commit in a transaction to 
675   a persistent database
676  */
677 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
678                                      struct ctdb_req_control *c)
679 {
680         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
681         struct ctdb_db_context *ctdb_db;
682
683         ctdb_db = find_ctdb_db(ctdb, client->db_id);
684         if (ctdb_db == NULL) {
685                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
686                                  "Unknown database 0x%08x\n", client->db_id));
687                 return -1;
688         }
689         if (!ctdb_db->transaction_active) {
690                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
691                                  "Database 0x%08x has no transaction commit "
692                                  "started\n", client->db_id));
693                 return -1;
694         }
695
696         ctdb_db->transaction_active = false;
697         client->db_id = 0;
698
699         if (client->num_persistent_updates == 0) {
700                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
701                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
702                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
703                 return -1;
704         }
705         client->num_persistent_updates--;
706
707         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
708                             "transaction commit db_id[0x%08x]\n",
709                             client->client_id, ctdb_db->db_id));
710
711         return 0;
712 }
713
714 /*
715   called when a client gets an error committing its database
716   during a transaction commit
717  */
718 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
719                                   struct ctdb_req_control *c)
720 {
721         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
722         struct ctdb_db_context *ctdb_db;
723
724         ctdb_db = find_ctdb_db(ctdb, client->db_id);
725         if (ctdb_db == NULL) {
726                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
727                                  "Unknown database 0x%08x\n", client->db_id));
728                 return -1;
729         }
730         if (!ctdb_db->transaction_active) {
731                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
732                                  "Database 0x%08x has no transaction commit "
733                                  "started\n", client->db_id));
734                 return -1;
735         }
736
737         ctdb_db->transaction_active = false;
738         client->db_id = 0;
739
740         if (client->num_persistent_updates == 0) {
741                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
742         } else {
743                 client->num_persistent_updates--;
744         }
745
746         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
747                          " db_id[0x%08x] - forcing recovery\n",
748                          ctdb_db->db_id));
749         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
750
751         return 0;
752 }
753
754 /**
755  * Tell whether a transaction is active on this node on the give DB.
756  */
757 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
758                                    struct ctdb_req_control *c,
759                                    uint32_t db_id)
760 {
761         struct ctdb_db_context *ctdb_db;
762         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
763
764         ctdb_db = find_ctdb_db(ctdb, db_id);
765         if (!ctdb_db) {
766                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
767                 return -1;
768         }
769
770         if (client->db_id == db_id) {
771                 return 0;
772         }
773
774         if (ctdb_db->transaction_active) {
775                 return 1;
776         } else {
777                 return 0;
778         }
779 }
780
781 /*
782   backwards compatibility:
783
784   start a persistent store operation. passing both the key, header and
785   data to the daemon. If the client disconnects before it has issued
786   a persistent_update call to the daemon we trigger a full recovery
787   to ensure the databases are brought back in sync.
788   for now we ignore the recdata that the client has passed to us.
789  */
790 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
791                                       struct ctdb_req_control *c,
792                                       TDB_DATA recdata)
793 {
794         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
795
796         if (client == NULL) {
797                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
798                 return -1;
799         }
800
801         client->num_persistent_updates++;
802
803         return 0;
804 }
805
806 /* 
807   backwards compatibility:
808
809   called to tell ctdbd that it is no longer doing a persistent update 
810 */
811 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
812                                               struct ctdb_req_control *c,
813                                               TDB_DATA recdata)
814 {
815         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
816
817         if (client == NULL) {
818                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
819                 return -1;
820         }
821
822         if (client->num_persistent_updates > 0) {
823                 client->num_persistent_updates--;
824         }
825
826         return 0;
827 }
828
829
830 /*
831   backwards compatibility:
832
833   single record varient of ctdb_control_trans2_commit for older clients
834  */
835 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
836                                       struct ctdb_req_control *c, 
837                                       TDB_DATA recdata, bool *async_reply)
838 {
839         struct ctdb_marshall_buffer *m;
840         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
841         TDB_DATA key, data;
842
843         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
844             rec->keylen + rec->datalen) {
845                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
846                 return -1;
847         }
848
849         key.dptr = &rec->data[0];
850         key.dsize = rec->keylen;
851         data.dptr = &rec->data[rec->keylen];
852         data.dsize = rec->datalen;
853
854         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
855         CTDB_NO_MEMORY(ctdb, m);
856
857         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
858 }
859
860 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
861                                   uint32_t db_id,
862                                   uint64_t *seqnum)
863 {
864         int32_t ret;
865         struct ctdb_db_context *ctdb_db;
866         const char *keyname = CTDB_DB_SEQNUM_KEY;
867         TDB_DATA key;
868         TDB_DATA data;
869         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
870
871         ctdb_db = find_ctdb_db(ctdb, db_id);
872         if (!ctdb_db) {
873                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
874                 ret = -1;
875                 goto done;
876         }
877
878         key.dptr = (uint8_t *)discard_const(keyname);
879         key.dsize = strlen(keyname) + 1;
880
881         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
882         if (ret != 0) {
883                 goto done;
884         }
885
886         if (data.dsize != sizeof(uint64_t)) {
887                 *seqnum = 0;
888                 goto done;
889         }
890
891         *seqnum = *(uint64_t *)data.dptr;
892
893 done:
894         talloc_free(mem_ctx);
895         return ret;
896 }
897
898 /**
899  * Get the sequence number of a persistent database.
900  */
901 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
902                                    TDB_DATA indata,
903                                    TDB_DATA *outdata)
904 {
905         uint32_t db_id;
906         int32_t ret;
907         uint64_t seqnum;
908
909         db_id = *(uint32_t *)indata.dptr;
910         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
911         if (ret != 0) {
912                 goto done;
913         }
914
915         outdata->dsize = sizeof(uint64_t);
916         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
917         if (outdata->dptr == NULL) {
918                 ret = -1;
919                 goto done;
920         }
921
922         *(outdata->dptr) = seqnum;
923
924 done:
925         return ret;
926 }