persistent: if a node failed to update_record, trigger a recovery
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35         uint32_t num_failed, num_sent;
36 };
37
38 /*
39   1) all nodes fail, and all nodes reply
40   2) some nodes fail, all nodes reply
41   3) some nodes timeout
42   4) all nodes succeed
43  */
44
45 /*
46   called when a node has acknowledged a ctdb_control_update_record call
47  */
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49                                      int32_t status, TDB_DATA data, 
50                                      const char *errormsg,
51                                      void *private_data)
52 {
53         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
54                                                               struct ctdb_persistent_state);
55
56         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
57                 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
58                                    "during recovery\n"));
59                 return;
60         }
61
62         if (status != 0) {
63                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
64                          status, errormsg));
65                 state->status = status;
66                 state->errormsg = errormsg;
67                 state->num_failed++;
68
69                 /*
70                  * If a node failed to complete the update_record control,
71                  * then either a recovery is already running or something
72                  * bad is going on. So trigger a recovery and let the
73                  * recovery finish the transaction, sending back the reply
74                  * for the trans3_commit control to the client.
75                  */
76                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
77                 return;
78         }
79
80         state->num_pending--;
81         if (state->num_pending == 0) {
82                 enum ctdb_trans2_commit_error etype;
83                 if (state->num_failed == state->num_sent) {
84                         etype = CTDB_TRANS2_COMMIT_ALLFAIL;
85                 } else if (state->num_failed != 0) {
86                         etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
87                 } else {
88                         etype = CTDB_TRANS2_COMMIT_SUCCESS;
89                 }
90                 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
91                 talloc_free(state);
92         }
93 }
94
95 /*
96   called if persistent store times out
97  */
98 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
99                                          struct timeval t, void *private_data)
100 {
101         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
102
103         if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
104                 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
105                                    "timeout during recovery\n"));
106                 return;
107         }
108
109         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
110                                    "timeout in ctdb_persistent_state");
111
112         talloc_free(state);
113 }
114
115 /*
116   store a set of persistent records - called from a ctdb client when it has updated
117   some records in a persistent database. The client will have the record
118   locked for the duration of this call. The client is the dmaster when 
119   this call is made
120  */
121 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
122                                    struct ctdb_req_control *c, 
123                                    TDB_DATA recdata, bool *async_reply)
124 {
125         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
126         struct ctdb_persistent_state *state;
127         int i;
128         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
129         struct ctdb_db_context *ctdb_db;
130
131         ctdb_db = find_ctdb_db(ctdb, m->db_id);
132         if (ctdb_db == NULL) {
133                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
134                                  "Unknown database db_id[0x%08x]\n", m->db_id));
135                 return -1;
136         }
137
138         if (client == NULL) {
139                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
140                 return -1;
141         }
142
143         if (ctdb_db->unhealthy_reason) {
144                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
145                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
146                 return -1;
147         }
148
149         /* handling num_persistent_updates is a bit strange - 
150            there are 3 cases
151              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
152                 They don't expect num_persistent_updates to be used at all
153
154              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
155                 this commit to then decrement it
156
157              3) new clients which use TRANS2 commit functions, and
158                 expect this function to increment the counter, and
159                 then have it decremented in ctdb_control_trans2_error
160                 or ctdb_control_trans2_finished
161         */
162         switch (c->opcode) {
163         case CTDB_CONTROL_PERSISTENT_STORE:
164                 if (ctdb_db->transaction_active) {
165                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
166                                           "transaction is active on database "
167                                           "db_id[0x%08x] - refusing persistent "
168                                          " store for client id[0x%08x]\n",
169                                           ctdb_db->db_id, client->client_id));
170                         return -1;
171                 }
172                 if (client->num_persistent_updates > 0) {
173                         client->num_persistent_updates--;
174                 }
175                 break;
176         case CTDB_CONTROL_TRANS2_COMMIT:
177                 if (ctdb_db->transaction_active) {
178                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
179                                          " already a transaction commit "
180                                          "active on db_id[0x%08x] - forbidding "
181                                          "client_id[0x%08x] to commit\n",
182                                          ctdb_db->db_id, client->client_id));
183                         return -1;
184                 }
185                 if (client->db_id != 0) {
186                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
187                                          "client-db_id[0x%08x] != 0 "
188                                          "(client_id[0x%08x])\n",
189                                          client->db_id, client->client_id));
190                         return -1;
191                 }
192                 client->num_persistent_updates++;
193                 ctdb_db->transaction_active = true;
194                 client->db_id = m->db_id;
195                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
196                                   " commit transaction on db id[0x%08x]\n",
197                                   client->client_id, client->db_id));
198                 break;
199         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
200                 /* already updated from the first commit */
201                 if (client->db_id != m->db_id) {
202                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
203                                          "retry: client-db_id[0x%08x] != "
204                                          "db_id[0x%08x] (client_id[0x%08x])\n",
205                                          client->db_id,
206                                          m->db_id, client->client_id));
207                         return -1;
208                 }
209                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
210                                     "transaction commit retry on "
211                                     "db_id[0x%08x]\n",
212                                     client->client_id, client->db_id));
213                 break;
214         }
215
216         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
217                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
218                 return -1;
219         }
220
221         state = talloc_zero(ctdb, struct ctdb_persistent_state);
222         CTDB_NO_MEMORY(ctdb, state);
223
224         state->ctdb = ctdb;
225         state->c    = c;
226
227         for (i=0;i<ctdb->vnn_map->size;i++) {
228                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
229                 int ret;
230
231                 /* only send to active nodes */
232                 if (node->flags & NODE_FLAGS_INACTIVE) {
233                         continue;
234                 }
235
236                 /* don't send to ourselves */
237                 if (node->pnn == ctdb->pnn) {
238                         continue;
239                 }
240                 
241                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
242                                                c->client_id, 0, recdata, 
243                                                ctdb_persistent_callback, state);
244                 if (ret == -1) {
245                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
246                         talloc_free(state);
247                         return -1;
248                 }
249
250                 state->num_pending++;
251                 state->num_sent++;
252         }
253
254         if (state->num_pending == 0) {
255                 talloc_free(state);
256                 return 0;
257         }
258         
259         /* we need to wait for the replies */
260         *async_reply = true;
261
262         /* need to keep the control structure around */
263         talloc_steal(state, c);
264
265         /* but we won't wait forever */
266         event_add_timed(ctdb->ev, state, 
267                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
268                         ctdb_persistent_store_timeout, state);
269
270         return 0;
271 }
272
273
274 /*
275  * Store a set of persistent records.
276  * This is used to roll out a transaction to all nodes.
277  */
278 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
279                                    struct ctdb_req_control *c,
280                                    TDB_DATA recdata, bool *async_reply)
281 {
282         struct ctdb_client *client;
283         struct ctdb_persistent_state *state;
284         int i;
285         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
286         struct ctdb_db_context *ctdb_db;
287
288         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
289                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
290                 return -1;
291         }
292
293         ctdb_db = find_ctdb_db(ctdb, m->db_id);
294         if (ctdb_db == NULL) {
295                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
296                                  "Unknown database db_id[0x%08x]\n", m->db_id));
297                 return -1;
298         }
299
300         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
301         if (client == NULL) {
302                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
303                                  "to a client. Returning error\n"));
304                 return -1;
305         }
306
307         state = talloc_zero(ctdb, struct ctdb_persistent_state);
308         CTDB_NO_MEMORY(ctdb, state);
309
310         state->ctdb = ctdb;
311         state->c    = c;
312
313         for (i = 0; i < ctdb->vnn_map->size; i++) {
314                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
315                 int ret;
316
317                 /* only send to active nodes */
318                 if (node->flags & NODE_FLAGS_INACTIVE) {
319                         continue;
320                 }
321
322                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
323                                                CTDB_CONTROL_UPDATE_RECORD,
324                                                c->client_id, 0, recdata,
325                                                ctdb_persistent_callback,
326                                                state);
327                 if (ret == -1) {
328                         DEBUG(DEBUG_ERR,("Unable to send "
329                                          "CTDB_CONTROL_UPDATE_RECORD "
330                                          "to pnn %u\n", node->pnn));
331                         talloc_free(state);
332                         return -1;
333                 }
334
335                 state->num_pending++;
336                 state->num_sent++;
337         }
338
339         if (state->num_pending == 0) {
340                 talloc_free(state);
341                 return 0;
342         }
343
344         /* we need to wait for the replies */
345         *async_reply = true;
346
347         /* need to keep the control structure around */
348         talloc_steal(state, c);
349
350         /* but we won't wait forever */
351         event_add_timed(ctdb->ev, state,
352                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
353                         ctdb_persistent_store_timeout, state);
354
355         return 0;
356 }
357
358
359 struct ctdb_persistent_write_state {
360         struct ctdb_db_context *ctdb_db;
361         struct ctdb_marshall_buffer *m;
362         struct ctdb_req_control *c;
363 };
364
365
366 /*
367   called from a child process to write the data
368  */
369 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
370 {
371         int ret, i;
372         struct ctdb_rec_data *rec = NULL;
373         struct ctdb_marshall_buffer *m = state->m;
374
375         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
376         if (ret == -1) {
377                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
378                                  state->ctdb_db->db_id));
379                 return -1;
380         }
381
382         for (i=0;i<m->count;i++) {
383                 struct ctdb_ltdb_header oldheader;
384                 struct ctdb_ltdb_header header;
385                 TDB_DATA key, data, olddata;
386                 TALLOC_CTX *tmp_ctx = talloc_new(state);
387
388                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
389                 
390                 if (rec == NULL) {
391                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
392                                          i, state->ctdb_db->db_id));
393                         talloc_free(tmp_ctx);
394                         goto failed;                    
395                 }
396
397                 /* fetch the old header and ensure the rsn is less than the new rsn */
398                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
399                 if (ret != 0) {
400                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
401                                          state->ctdb_db->db_id));
402                         talloc_free(tmp_ctx);
403                         goto failed;
404                 }
405
406                 if (oldheader.rsn >= header.rsn &&
407                     (olddata.dsize != data.dsize || 
408                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
409                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
410                                           state->ctdb_db->db_id, 
411                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
412                         talloc_free(tmp_ctx);
413                         goto failed;
414                 }
415
416                 talloc_free(tmp_ctx);
417
418                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
419                 if (ret != 0) {
420                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
421                                           state->ctdb_db->db_id));
422                         goto failed;
423                 }
424         }
425
426         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
427         if (ret == -1) {
428                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
429                                  state->ctdb_db->db_id));
430                 return -1;
431         }
432
433         return 0;
434         
435 failed:
436         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
437         return -1;
438 }
439
440
441 /*
442   called when we the child has completed the persistent write
443   on our behalf
444  */
445 static void ctdb_persistent_write_callback(int status, void *private_data)
446 {
447         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
448                                                                    struct ctdb_persistent_write_state);
449
450
451         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
452
453         talloc_free(state);
454 }
455
456 /*
457   called if our lockwait child times out
458  */
459 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
460                                          struct timeval t, void *private_data)
461 {
462         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
463                                                                    struct ctdb_persistent_write_state);
464         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
465         talloc_free(state);
466 }
467
468 struct childwrite_handle {
469         struct ctdb_context *ctdb;
470         struct ctdb_db_context *ctdb_db;
471         struct fd_event *fde;
472         int fd[2];
473         pid_t child;
474         void *private_data;
475         void (*callback)(int, void *);
476         struct timeval start_time;
477 };
478
479 static int childwrite_destructor(struct childwrite_handle *h)
480 {
481         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
482         kill(h->child, SIGKILL);
483         return 0;
484 }
485
486 /* called when the child process has finished writing the record to the
487    database
488 */
489 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
490                              uint16_t flags, void *private_data)
491 {
492         struct childwrite_handle *h = talloc_get_type(private_data, 
493                                                      struct childwrite_handle);
494         void *p = h->private_data;
495         void (*callback)(int, void *) = h->callback;
496         pid_t child = h->child;
497         TALLOC_CTX *tmp_ctx = talloc_new(ev);
498         int ret;
499         char c;
500
501         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
502         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
503
504         /* the handle needs to go away when the context is gone - when
505            the handle goes away this implicitly closes the pipe, which
506            kills the child */
507         talloc_steal(tmp_ctx, h);
508
509         talloc_set_destructor(h, NULL);
510
511         ret = read(h->fd[0], &c, 1);
512         if (ret < 1) {
513                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
514                 c = 1;
515         }
516
517         callback(c, p);
518
519         kill(child, SIGKILL);
520         talloc_free(tmp_ctx);
521 }
522
523 /* this creates a child process which will take out a tdb transaction
524    and write the record to the database.
525 */
526 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
527                                 void (*callback)(int, void *private_data),
528                                 struct ctdb_persistent_write_state *state)
529 {
530         struct childwrite_handle *result;
531         int ret;
532         pid_t parent = getpid();
533
534         CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
535         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
536
537         if (!(result = talloc_zero(state, struct childwrite_handle))) {
538                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
539                 return NULL;
540         }
541
542         ret = pipe(result->fd);
543
544         if (ret != 0) {
545                 talloc_free(result);
546                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
547                 return NULL;
548         }
549
550         result->child = ctdb_fork(ctdb_db->ctdb);
551
552         if (result->child == (pid_t)-1) {
553                 close(result->fd[0]);
554                 close(result->fd[1]);
555                 talloc_free(result);
556                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
557                 return NULL;
558         }
559
560         result->callback = callback;
561         result->private_data = state;
562         result->ctdb = ctdb_db->ctdb;
563         result->ctdb_db = ctdb_db;
564
565         if (result->child == 0) {
566                 char c = 0;
567
568                 close(result->fd[0]);
569                 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
570                 ret = ctdb_persistent_store(state);
571                 if (ret != 0) {
572                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
573                         c = 1;
574                 }
575
576                 write(result->fd[1], &c, 1);
577
578                 /* make sure we die when our parent dies */
579                 while (kill(parent, 0) == 0 || errno != ESRCH) {
580                         sleep(5);
581                 }
582                 _exit(0);
583         }
584
585         close(result->fd[1]);
586         set_close_on_exec(result->fd[0]);
587
588         talloc_set_destructor(result, childwrite_destructor);
589
590         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
591
592         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
593                                    EVENT_FD_READ, childwrite_handler,
594                                    (void *)result);
595         if (result->fde == NULL) {
596                 talloc_free(result);
597                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
598                 return NULL;
599         }
600         tevent_fd_set_auto_close(result->fde);
601
602         result->start_time = timeval_current();
603
604         return result;
605 }
606
607 /* 
608    update a record on this node if the new record has a higher rsn than the
609    current record
610  */
611 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
612                                    struct ctdb_req_control *c, TDB_DATA recdata, 
613                                    bool *async_reply)
614 {
615         struct ctdb_db_context *ctdb_db;
616         struct ctdb_persistent_write_state *state;
617         struct childwrite_handle *handle;
618         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
619
620         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
621                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
622                 return -1;
623         }
624
625         ctdb_db = find_ctdb_db(ctdb, m->db_id);
626         if (ctdb_db == NULL) {
627                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
628                 return -1;
629         }
630
631         if (ctdb_db->unhealthy_reason) {
632                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
633                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
634                 return -1;
635         }
636
637         state = talloc(ctdb, struct ctdb_persistent_write_state);
638         CTDB_NO_MEMORY(ctdb, state);
639
640         state->ctdb_db = ctdb_db;
641         state->c       = c;
642         state->m       = m;
643
644         /* create a child process to take out a transaction and 
645            write the data.
646         */
647         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
648         if (handle == NULL) {
649                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
650                 talloc_free(state);
651                 return -1;
652         }
653
654         /* we need to wait for the replies */
655         *async_reply = true;
656
657         /* need to keep the control structure around */
658         talloc_steal(state, c);
659
660         /* but we won't wait forever */
661         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
662                         ctdb_persistent_lock_timeout, state);
663
664         return 0;
665 }
666
667
668 /*
669   called when a client has finished a local commit in a transaction to 
670   a persistent database
671  */
672 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
673                                      struct ctdb_req_control *c)
674 {
675         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
676         struct ctdb_db_context *ctdb_db;
677
678         ctdb_db = find_ctdb_db(ctdb, client->db_id);
679         if (ctdb_db == NULL) {
680                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
681                                  "Unknown database 0x%08x\n", client->db_id));
682                 return -1;
683         }
684         if (!ctdb_db->transaction_active) {
685                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
686                                  "Database 0x%08x has no transaction commit "
687                                  "started\n", client->db_id));
688                 return -1;
689         }
690
691         ctdb_db->transaction_active = false;
692         client->db_id = 0;
693
694         if (client->num_persistent_updates == 0) {
695                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
696                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
697                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
698                 return -1;
699         }
700         client->num_persistent_updates--;
701
702         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
703                             "transaction commit db_id[0x%08x]\n",
704                             client->client_id, ctdb_db->db_id));
705
706         return 0;
707 }
708
709 /*
710   called when a client gets an error committing its database
711   during a transaction commit
712  */
713 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
714                                   struct ctdb_req_control *c)
715 {
716         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
717         struct ctdb_db_context *ctdb_db;
718
719         ctdb_db = find_ctdb_db(ctdb, client->db_id);
720         if (ctdb_db == NULL) {
721                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
722                                  "Unknown database 0x%08x\n", client->db_id));
723                 return -1;
724         }
725         if (!ctdb_db->transaction_active) {
726                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
727                                  "Database 0x%08x has no transaction commit "
728                                  "started\n", client->db_id));
729                 return -1;
730         }
731
732         ctdb_db->transaction_active = false;
733         client->db_id = 0;
734
735         if (client->num_persistent_updates == 0) {
736                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
737         } else {
738                 client->num_persistent_updates--;
739         }
740
741         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
742                          " db_id[0x%08x] - forcing recovery\n",
743                          ctdb_db->db_id));
744         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
745
746         return 0;
747 }
748
749 /**
750  * Tell whether a transaction is active on this node on the give DB.
751  */
752 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
753                                    struct ctdb_req_control *c,
754                                    uint32_t db_id)
755 {
756         struct ctdb_db_context *ctdb_db;
757         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
758
759         ctdb_db = find_ctdb_db(ctdb, db_id);
760         if (!ctdb_db) {
761                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
762                 return -1;
763         }
764
765         if (client->db_id == db_id) {
766                 return 0;
767         }
768
769         if (ctdb_db->transaction_active) {
770                 return 1;
771         } else {
772                 return 0;
773         }
774 }
775
776 /*
777   backwards compatibility:
778
779   start a persistent store operation. passing both the key, header and
780   data to the daemon. If the client disconnects before it has issued
781   a persistent_update call to the daemon we trigger a full recovery
782   to ensure the databases are brought back in sync.
783   for now we ignore the recdata that the client has passed to us.
784  */
785 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
786                                       struct ctdb_req_control *c,
787                                       TDB_DATA recdata)
788 {
789         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
790
791         if (client == NULL) {
792                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
793                 return -1;
794         }
795
796         client->num_persistent_updates++;
797
798         return 0;
799 }
800
801 /* 
802   backwards compatibility:
803
804   called to tell ctdbd that it is no longer doing a persistent update 
805 */
806 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
807                                               struct ctdb_req_control *c,
808                                               TDB_DATA recdata)
809 {
810         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
811
812         if (client == NULL) {
813                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
814                 return -1;
815         }
816
817         if (client->num_persistent_updates > 0) {
818                 client->num_persistent_updates--;
819         }
820
821         return 0;
822 }
823
824
825 /*
826   backwards compatibility:
827
828   single record varient of ctdb_control_trans2_commit for older clients
829  */
830 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
831                                       struct ctdb_req_control *c, 
832                                       TDB_DATA recdata, bool *async_reply)
833 {
834         struct ctdb_marshall_buffer *m;
835         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
836         TDB_DATA key, data;
837
838         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
839             rec->keylen + rec->datalen) {
840                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
841                 return -1;
842         }
843
844         key.dptr = &rec->data[0];
845         key.dsize = rec->keylen;
846         data.dptr = &rec->data[rec->keylen];
847         data.dsize = rec->datalen;
848
849         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
850         CTDB_NO_MEMORY(ctdb, m);
851
852         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
853 }
854
855 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
856                                   uint32_t db_id,
857                                   uint64_t *seqnum)
858 {
859         int32_t ret;
860         struct ctdb_db_context *ctdb_db;
861         const char *keyname = CTDB_DB_SEQNUM_KEY;
862         TDB_DATA key;
863         TDB_DATA data;
864         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
865
866         ctdb_db = find_ctdb_db(ctdb, db_id);
867         if (!ctdb_db) {
868                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
869                 ret = -1;
870                 goto done;
871         }
872
873         key.dptr = (uint8_t *)discard_const(keyname);
874         key.dsize = strlen(keyname) + 1;
875
876         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
877         if (ret != 0) {
878                 goto done;
879         }
880
881         if (data.dsize != sizeof(uint64_t)) {
882                 *seqnum = 0;
883                 goto done;
884         }
885
886         *seqnum = *(uint64_t *)data.dptr;
887
888 done:
889         talloc_free(mem_ctx);
890         return ret;
891 }
892
893 /**
894  * Get the sequence number of a persistent database.
895  */
896 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
897                                    TDB_DATA indata,
898                                    TDB_DATA *outdata)
899 {
900         uint32_t db_id;
901         int32_t ret;
902         uint64_t seqnum;
903
904         db_id = *(uint32_t *)indata.dptr;
905         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
906         if (ret != 0) {
907                 goto done;
908         }
909
910         outdata->dsize = sizeof(uint64_t);
911         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
912         if (outdata->dptr == NULL) {
913                 ret = -1;
914                 goto done;
915         }
916
917         *(outdata->dptr) = seqnum;
918
919 done:
920         return ret;
921 }