a6fcf48e3a643099ff25ab91aa805990bf69e795
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35         uint32_t num_failed, num_sent;
36 };
37
38 /*
39   1) all nodes fail, and all nodes reply
40   2) some nodes fail, all nodes reply
41   3) some nodes timeout
42   4) all nodes succeed
43  */
44
45 /*
46   called when a node has acknowledged a ctdb_control_update_record call
47  */
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49                                      int32_t status, TDB_DATA data, 
50                                      const char *errormsg,
51                                      void *private_data)
52 {
53         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
54                                                               struct ctdb_persistent_state);
55         enum ctdb_trans2_commit_error etype;
56
57         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
58                 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
59                                    "during recovery\n"));
60                 return;
61         }
62
63         if (status != 0) {
64                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
65                          status, errormsg?errormsg:"no error message given"));
66                 state->status = status;
67                 state->errormsg = errormsg;
68                 state->num_failed++;
69
70                 /*
71                  * If a node failed to complete the update_record control,
72                  * then either a recovery is already running or something
73                  * bad is going on. So trigger a recovery and let the
74                  * recovery finish the transaction, sending back the reply
75                  * for the trans3_commit control to the client.
76                  */
77                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
78                 return;
79         }
80
81         state->num_pending--;
82
83         if (state->num_pending != 0) {
84                 return;
85         }
86
87         if (state->num_failed == state->num_sent) {
88                 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
89         } else if (state->num_failed != 0) {
90                 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
91         } else {
92                 etype = CTDB_TRANS2_COMMIT_SUCCESS;
93         }
94
95         ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
96         talloc_free(state);
97 }
98
99 /*
100   called if persistent store times out
101  */
102 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
103                                          struct timeval t, void *private_data)
104 {
105         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
106
107         if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
108                 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
109                                    "timeout during recovery\n"));
110                 return;
111         }
112
113         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
114                                    "timeout in ctdb_persistent_state");
115
116         talloc_free(state);
117 }
118
119 /*
120   store a set of persistent records - called from a ctdb client when it has updated
121   some records in a persistent database. The client will have the record
122   locked for the duration of this call. The client is the dmaster when 
123   this call is made
124  */
125 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
126                                    struct ctdb_req_control *c, 
127                                    TDB_DATA recdata, bool *async_reply)
128 {
129         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
130         struct ctdb_persistent_state *state;
131         int i;
132         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
133         struct ctdb_db_context *ctdb_db;
134
135         ctdb_db = find_ctdb_db(ctdb, m->db_id);
136         if (ctdb_db == NULL) {
137                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
138                                  "Unknown database db_id[0x%08x]\n", m->db_id));
139                 return -1;
140         }
141
142         if (client == NULL) {
143                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
144                 return -1;
145         }
146
147         if (ctdb_db->unhealthy_reason) {
148                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
149                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
150                 return -1;
151         }
152
153         /* handling num_persistent_updates is a bit strange - 
154            there are 3 cases
155              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
156                 They don't expect num_persistent_updates to be used at all
157
158              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
159                 this commit to then decrement it
160
161              3) new clients which use TRANS2 commit functions, and
162                 expect this function to increment the counter, and
163                 then have it decremented in ctdb_control_trans2_error
164                 or ctdb_control_trans2_finished
165         */
166         switch (c->opcode) {
167         case CTDB_CONTROL_PERSISTENT_STORE:
168                 if (ctdb_db->transaction_active) {
169                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
170                                           "transaction is active on database "
171                                           "db_id[0x%08x] - refusing persistent "
172                                          " store for client id[0x%08x]\n",
173                                           ctdb_db->db_id, client->client_id));
174                         return -1;
175                 }
176                 if (client->num_persistent_updates > 0) {
177                         client->num_persistent_updates--;
178                 }
179                 break;
180         case CTDB_CONTROL_TRANS2_COMMIT:
181                 if (ctdb_db->transaction_active) {
182                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
183                                          " already a transaction commit "
184                                          "active on db_id[0x%08x] - forbidding "
185                                          "client_id[0x%08x] to commit\n",
186                                          ctdb_db->db_id, client->client_id));
187                         return -1;
188                 }
189                 if (client->db_id != 0) {
190                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
191                                          "client-db_id[0x%08x] != 0 "
192                                          "(client_id[0x%08x])\n",
193                                          client->db_id, client->client_id));
194                         return -1;
195                 }
196                 client->num_persistent_updates++;
197                 ctdb_db->transaction_active = true;
198                 client->db_id = m->db_id;
199                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
200                                   " commit transaction on db id[0x%08x]\n",
201                                   client->client_id, client->db_id));
202                 break;
203         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
204                 /* already updated from the first commit */
205                 if (client->db_id != m->db_id) {
206                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
207                                          "retry: client-db_id[0x%08x] != "
208                                          "db_id[0x%08x] (client_id[0x%08x])\n",
209                                          client->db_id,
210                                          m->db_id, client->client_id));
211                         return -1;
212                 }
213                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
214                                     "transaction commit retry on "
215                                     "db_id[0x%08x]\n",
216                                     client->client_id, client->db_id));
217                 break;
218         }
219
220         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
221                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
222                 return -1;
223         }
224
225         state = talloc_zero(ctdb, struct ctdb_persistent_state);
226         CTDB_NO_MEMORY(ctdb, state);
227
228         state->ctdb = ctdb;
229         state->c    = c;
230
231         for (i=0;i<ctdb->vnn_map->size;i++) {
232                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
233                 int ret;
234
235                 /* only send to active nodes */
236                 if (node->flags & NODE_FLAGS_INACTIVE) {
237                         continue;
238                 }
239
240                 /* don't send to ourselves */
241                 if (node->pnn == ctdb->pnn) {
242                         continue;
243                 }
244                 
245                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
246                                                c->client_id, 0, recdata, 
247                                                ctdb_persistent_callback, state);
248                 if (ret == -1) {
249                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
250                         talloc_free(state);
251                         return -1;
252                 }
253
254                 state->num_pending++;
255                 state->num_sent++;
256         }
257
258         if (state->num_pending == 0) {
259                 talloc_free(state);
260                 return 0;
261         }
262         
263         /* we need to wait for the replies */
264         *async_reply = true;
265
266         /* need to keep the control structure around */
267         talloc_steal(state, c);
268
269         /* but we won't wait forever */
270         event_add_timed(ctdb->ev, state, 
271                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
272                         ctdb_persistent_store_timeout, state);
273
274         return 0;
275 }
276
277
278 /*
279  * Store a set of persistent records.
280  * This is used to roll out a transaction to all nodes.
281  */
282 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
283                                    struct ctdb_req_control *c,
284                                    TDB_DATA recdata, bool *async_reply)
285 {
286         struct ctdb_client *client;
287         struct ctdb_persistent_state *state;
288         int i;
289         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
290         struct ctdb_db_context *ctdb_db;
291
292         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
293                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
294                 return -1;
295         }
296
297         ctdb_db = find_ctdb_db(ctdb, m->db_id);
298         if (ctdb_db == NULL) {
299                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
300                                  "Unknown database db_id[0x%08x]\n", m->db_id));
301                 return -1;
302         }
303
304         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
305         if (client == NULL) {
306                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
307                                  "to a client. Returning error\n"));
308                 return -1;
309         }
310
311         state = talloc_zero(ctdb, struct ctdb_persistent_state);
312         CTDB_NO_MEMORY(ctdb, state);
313
314         state->ctdb = ctdb;
315         state->c    = c;
316
317         for (i = 0; i < ctdb->vnn_map->size; i++) {
318                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
319                 int ret;
320
321                 /* only send to active nodes */
322                 if (node->flags & NODE_FLAGS_INACTIVE) {
323                         continue;
324                 }
325
326                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
327                                                CTDB_CONTROL_UPDATE_RECORD,
328                                                c->client_id, 0, recdata,
329                                                ctdb_persistent_callback,
330                                                state);
331                 if (ret == -1) {
332                         DEBUG(DEBUG_ERR,("Unable to send "
333                                          "CTDB_CONTROL_UPDATE_RECORD "
334                                          "to pnn %u\n", node->pnn));
335                         talloc_free(state);
336                         return -1;
337                 }
338
339                 state->num_pending++;
340                 state->num_sent++;
341         }
342
343         if (state->num_pending == 0) {
344                 talloc_free(state);
345                 return 0;
346         }
347
348         /* we need to wait for the replies */
349         *async_reply = true;
350
351         /* need to keep the control structure around */
352         talloc_steal(state, c);
353
354         /* but we won't wait forever */
355         event_add_timed(ctdb->ev, state,
356                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
357                         ctdb_persistent_store_timeout, state);
358
359         return 0;
360 }
361
362
363 struct ctdb_persistent_write_state {
364         struct ctdb_db_context *ctdb_db;
365         struct ctdb_marshall_buffer *m;
366         struct ctdb_req_control *c;
367 };
368
369
370 /*
371   called from a child process to write the data
372  */
373 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
374 {
375         int ret, i;
376         struct ctdb_rec_data *rec = NULL;
377         struct ctdb_marshall_buffer *m = state->m;
378
379         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
380         if (ret == -1) {
381                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
382                                  state->ctdb_db->db_id));
383                 return -1;
384         }
385
386         for (i=0;i<m->count;i++) {
387                 struct ctdb_ltdb_header oldheader;
388                 struct ctdb_ltdb_header header;
389                 TDB_DATA key, data, olddata;
390                 TALLOC_CTX *tmp_ctx = talloc_new(state);
391
392                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
393                 
394                 if (rec == NULL) {
395                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
396                                          i, state->ctdb_db->db_id));
397                         talloc_free(tmp_ctx);
398                         goto failed;                    
399                 }
400
401                 /* fetch the old header and ensure the rsn is less than the new rsn */
402                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
403                 if (ret != 0) {
404                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
405                                          state->ctdb_db->db_id));
406                         talloc_free(tmp_ctx);
407                         goto failed;
408                 }
409
410                 if (oldheader.rsn >= header.rsn &&
411                     (olddata.dsize != data.dsize || 
412                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
413                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
414                                           state->ctdb_db->db_id, 
415                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
416                         talloc_free(tmp_ctx);
417                         goto failed;
418                 }
419
420                 talloc_free(tmp_ctx);
421
422                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
423                 if (ret != 0) {
424                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
425                                           state->ctdb_db->db_id));
426                         goto failed;
427                 }
428         }
429
430         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
431         if (ret == -1) {
432                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
433                                  state->ctdb_db->db_id));
434                 return -1;
435         }
436
437         return 0;
438         
439 failed:
440         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
441         return -1;
442 }
443
444
445 /*
446   called when we the child has completed the persistent write
447   on our behalf
448  */
449 static void ctdb_persistent_write_callback(int status, void *private_data)
450 {
451         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
452                                                                    struct ctdb_persistent_write_state);
453
454
455         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
456
457         talloc_free(state);
458 }
459
460 /*
461   called if our lockwait child times out
462  */
463 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
464                                          struct timeval t, void *private_data)
465 {
466         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
467                                                                    struct ctdb_persistent_write_state);
468         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
469         talloc_free(state);
470 }
471
472 struct childwrite_handle {
473         struct ctdb_context *ctdb;
474         struct ctdb_db_context *ctdb_db;
475         struct fd_event *fde;
476         int fd[2];
477         pid_t child;
478         void *private_data;
479         void (*callback)(int, void *);
480         struct timeval start_time;
481 };
482
483 static int childwrite_destructor(struct childwrite_handle *h)
484 {
485         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
486         kill(h->child, SIGKILL);
487         return 0;
488 }
489
490 /* called when the child process has finished writing the record to the
491    database
492 */
493 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
494                              uint16_t flags, void *private_data)
495 {
496         struct childwrite_handle *h = talloc_get_type(private_data, 
497                                                      struct childwrite_handle);
498         void *p = h->private_data;
499         void (*callback)(int, void *) = h->callback;
500         pid_t child = h->child;
501         TALLOC_CTX *tmp_ctx = talloc_new(ev);
502         int ret;
503         char c;
504
505         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
506         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
507
508         /* the handle needs to go away when the context is gone - when
509            the handle goes away this implicitly closes the pipe, which
510            kills the child */
511         talloc_steal(tmp_ctx, h);
512
513         talloc_set_destructor(h, NULL);
514
515         ret = read(h->fd[0], &c, 1);
516         if (ret < 1) {
517                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
518                 c = 1;
519         }
520
521         callback(c, p);
522
523         kill(child, SIGKILL);
524         talloc_free(tmp_ctx);
525 }
526
527 /* this creates a child process which will take out a tdb transaction
528    and write the record to the database.
529 */
530 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
531                                 void (*callback)(int, void *private_data),
532                                 struct ctdb_persistent_write_state *state)
533 {
534         struct childwrite_handle *result;
535         int ret;
536         pid_t parent = getpid();
537
538         CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
539         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
540
541         if (!(result = talloc_zero(state, struct childwrite_handle))) {
542                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
543                 return NULL;
544         }
545
546         ret = pipe(result->fd);
547
548         if (ret != 0) {
549                 talloc_free(result);
550                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
551                 return NULL;
552         }
553
554         result->child = ctdb_fork(ctdb_db->ctdb);
555
556         if (result->child == (pid_t)-1) {
557                 close(result->fd[0]);
558                 close(result->fd[1]);
559                 talloc_free(result);
560                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
561                 return NULL;
562         }
563
564         result->callback = callback;
565         result->private_data = state;
566         result->ctdb = ctdb_db->ctdb;
567         result->ctdb_db = ctdb_db;
568
569         if (result->child == 0) {
570                 char c = 0;
571
572                 close(result->fd[0]);
573                 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
574                 ret = ctdb_persistent_store(state);
575                 if (ret != 0) {
576                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
577                         c = 1;
578                 }
579
580                 write(result->fd[1], &c, 1);
581
582                 /* make sure we die when our parent dies */
583                 while (kill(parent, 0) == 0 || errno != ESRCH) {
584                         sleep(5);
585                 }
586                 _exit(0);
587         }
588
589         close(result->fd[1]);
590         set_close_on_exec(result->fd[0]);
591
592         talloc_set_destructor(result, childwrite_destructor);
593
594         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
595
596         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
597                                    EVENT_FD_READ, childwrite_handler,
598                                    (void *)result);
599         if (result->fde == NULL) {
600                 talloc_free(result);
601                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
602                 return NULL;
603         }
604         tevent_fd_set_auto_close(result->fde);
605
606         result->start_time = timeval_current();
607
608         return result;
609 }
610
611 /* 
612    update a record on this node if the new record has a higher rsn than the
613    current record
614  */
615 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
616                                    struct ctdb_req_control *c, TDB_DATA recdata, 
617                                    bool *async_reply)
618 {
619         struct ctdb_db_context *ctdb_db;
620         struct ctdb_persistent_write_state *state;
621         struct childwrite_handle *handle;
622         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
623
624         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
625                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
626                 return -1;
627         }
628
629         ctdb_db = find_ctdb_db(ctdb, m->db_id);
630         if (ctdb_db == NULL) {
631                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
632                 return -1;
633         }
634
635         if (ctdb_db->unhealthy_reason) {
636                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
637                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
638                 return -1;
639         }
640
641         state = talloc(ctdb, struct ctdb_persistent_write_state);
642         CTDB_NO_MEMORY(ctdb, state);
643
644         state->ctdb_db = ctdb_db;
645         state->c       = c;
646         state->m       = m;
647
648         /* create a child process to take out a transaction and 
649            write the data.
650         */
651         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
652         if (handle == NULL) {
653                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
654                 talloc_free(state);
655                 return -1;
656         }
657
658         /* we need to wait for the replies */
659         *async_reply = true;
660
661         /* need to keep the control structure around */
662         talloc_steal(state, c);
663
664         /* but we won't wait forever */
665         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
666                         ctdb_persistent_lock_timeout, state);
667
668         return 0;
669 }
670
671
672 /*
673   called when a client has finished a local commit in a transaction to 
674   a persistent database
675  */
676 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
677                                      struct ctdb_req_control *c)
678 {
679         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
680         struct ctdb_db_context *ctdb_db;
681
682         ctdb_db = find_ctdb_db(ctdb, client->db_id);
683         if (ctdb_db == NULL) {
684                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
685                                  "Unknown database 0x%08x\n", client->db_id));
686                 return -1;
687         }
688         if (!ctdb_db->transaction_active) {
689                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
690                                  "Database 0x%08x has no transaction commit "
691                                  "started\n", client->db_id));
692                 return -1;
693         }
694
695         ctdb_db->transaction_active = false;
696         client->db_id = 0;
697
698         if (client->num_persistent_updates == 0) {
699                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
700                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
701                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
702                 return -1;
703         }
704         client->num_persistent_updates--;
705
706         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
707                             "transaction commit db_id[0x%08x]\n",
708                             client->client_id, ctdb_db->db_id));
709
710         return 0;
711 }
712
713 /*
714   called when a client gets an error committing its database
715   during a transaction commit
716  */
717 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
718                                   struct ctdb_req_control *c)
719 {
720         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
721         struct ctdb_db_context *ctdb_db;
722
723         ctdb_db = find_ctdb_db(ctdb, client->db_id);
724         if (ctdb_db == NULL) {
725                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
726                                  "Unknown database 0x%08x\n", client->db_id));
727                 return -1;
728         }
729         if (!ctdb_db->transaction_active) {
730                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
731                                  "Database 0x%08x has no transaction commit "
732                                  "started\n", client->db_id));
733                 return -1;
734         }
735
736         ctdb_db->transaction_active = false;
737         client->db_id = 0;
738
739         if (client->num_persistent_updates == 0) {
740                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
741         } else {
742                 client->num_persistent_updates--;
743         }
744
745         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
746                          " db_id[0x%08x] - forcing recovery\n",
747                          ctdb_db->db_id));
748         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
749
750         return 0;
751 }
752
753 /**
754  * Tell whether a transaction is active on this node on the give DB.
755  */
756 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
757                                    struct ctdb_req_control *c,
758                                    uint32_t db_id)
759 {
760         struct ctdb_db_context *ctdb_db;
761         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
762
763         ctdb_db = find_ctdb_db(ctdb, db_id);
764         if (!ctdb_db) {
765                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
766                 return -1;
767         }
768
769         if (client->db_id == db_id) {
770                 return 0;
771         }
772
773         if (ctdb_db->transaction_active) {
774                 return 1;
775         } else {
776                 return 0;
777         }
778 }
779
780 /*
781   backwards compatibility:
782
783   start a persistent store operation. passing both the key, header and
784   data to the daemon. If the client disconnects before it has issued
785   a persistent_update call to the daemon we trigger a full recovery
786   to ensure the databases are brought back in sync.
787   for now we ignore the recdata that the client has passed to us.
788  */
789 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
790                                       struct ctdb_req_control *c,
791                                       TDB_DATA recdata)
792 {
793         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
794
795         if (client == NULL) {
796                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
797                 return -1;
798         }
799
800         client->num_persistent_updates++;
801
802         return 0;
803 }
804
805 /* 
806   backwards compatibility:
807
808   called to tell ctdbd that it is no longer doing a persistent update 
809 */
810 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
811                                               struct ctdb_req_control *c,
812                                               TDB_DATA recdata)
813 {
814         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
815
816         if (client == NULL) {
817                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
818                 return -1;
819         }
820
821         if (client->num_persistent_updates > 0) {
822                 client->num_persistent_updates--;
823         }
824
825         return 0;
826 }
827
828
829 /*
830   backwards compatibility:
831
832   single record varient of ctdb_control_trans2_commit for older clients
833  */
834 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
835                                       struct ctdb_req_control *c, 
836                                       TDB_DATA recdata, bool *async_reply)
837 {
838         struct ctdb_marshall_buffer *m;
839         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
840         TDB_DATA key, data;
841
842         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
843             rec->keylen + rec->datalen) {
844                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
845                 return -1;
846         }
847
848         key.dptr = &rec->data[0];
849         key.dsize = rec->keylen;
850         data.dptr = &rec->data[rec->keylen];
851         data.dsize = rec->datalen;
852
853         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
854         CTDB_NO_MEMORY(ctdb, m);
855
856         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
857 }
858
859 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
860                                   uint32_t db_id,
861                                   uint64_t *seqnum)
862 {
863         int32_t ret;
864         struct ctdb_db_context *ctdb_db;
865         const char *keyname = CTDB_DB_SEQNUM_KEY;
866         TDB_DATA key;
867         TDB_DATA data;
868         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
869
870         ctdb_db = find_ctdb_db(ctdb, db_id);
871         if (!ctdb_db) {
872                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
873                 ret = -1;
874                 goto done;
875         }
876
877         key.dptr = (uint8_t *)discard_const(keyname);
878         key.dsize = strlen(keyname) + 1;
879
880         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
881         if (ret != 0) {
882                 goto done;
883         }
884
885         if (data.dsize != sizeof(uint64_t)) {
886                 *seqnum = 0;
887                 goto done;
888         }
889
890         *seqnum = *(uint64_t *)data.dptr;
891
892 done:
893         talloc_free(mem_ctx);
894         return ret;
895 }
896
897 /**
898  * Get the sequence number of a persistent database.
899  */
900 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
901                                    TDB_DATA indata,
902                                    TDB_DATA *outdata)
903 {
904         uint32_t db_id;
905         int32_t ret;
906         uint64_t seqnum;
907
908         db_id = *(uint32_t *)indata.dptr;
909         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
910         if (ret != 0) {
911                 goto done;
912         }
913
914         outdata->dsize = sizeof(uint64_t);
915         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
916         if (outdata->dptr == NULL) {
917                 ret = -1;
918                 goto done;
919         }
920
921         *(outdata->dptr) = seqnum;
922
923 done:
924         return ret;
925 }