e299c9fac3ac659f0941edc0bbd12eab46caa76d
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
32         struct ctdb_req_control *c;
33         const char *errormsg;
34         uint32_t num_pending;
35         int32_t status;
36         uint32_t num_failed, num_sent;
37 };
38
39 /*
40   1) all nodes fail, and all nodes reply
41   2) some nodes fail, all nodes reply
42   3) some nodes timeout
43   4) all nodes succeed
44  */
45
46 /*
47   called when a node has acknowledged a ctdb_control_update_record call
48  */
49 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
50                                      int32_t status, TDB_DATA data, 
51                                      const char *errormsg,
52                                      void *private_data)
53 {
54         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
55                                                               struct ctdb_persistent_state);
56         enum ctdb_trans2_commit_error etype;
57
58         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
59                 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
60                                    "during recovery\n"));
61                 return;
62         }
63
64         if (status != 0) {
65                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
66                          status, errormsg?errormsg:"no error message given"));
67                 state->status = status;
68                 state->errormsg = errormsg;
69                 state->num_failed++;
70
71                 /*
72                  * If a node failed to complete the update_record control,
73                  * then either a recovery is already running or something
74                  * bad is going on. So trigger a recovery and let the
75                  * recovery finish the transaction, sending back the reply
76                  * for the trans3_commit control to the client.
77                  */
78                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
79                 return;
80         }
81
82         state->num_pending--;
83
84         if (state->num_pending != 0) {
85                 return;
86         }
87
88         if (state->num_failed == state->num_sent) {
89                 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
90         } else if (state->num_failed != 0) {
91                 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
92         } else {
93                 etype = CTDB_TRANS2_COMMIT_SUCCESS;
94         }
95
96         ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
97         talloc_free(state);
98 }
99
100 /*
101   called if persistent store times out
102  */
103 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
104                                          struct timeval t, void *private_data)
105 {
106         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
107
108         if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109                 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
110                                    "timeout during recovery\n"));
111                 return;
112         }
113
114         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
115                                    "timeout in ctdb_persistent_state");
116
117         talloc_free(state);
118 }
119
120 /*
121   store a set of persistent records - called from a ctdb client when it has updated
122   some records in a persistent database. The client will have the record
123   locked for the duration of this call. The client is the dmaster when 
124   this call is made
125  */
126 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
127                                    struct ctdb_req_control *c, 
128                                    TDB_DATA recdata, bool *async_reply)
129 {
130         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
131         struct ctdb_persistent_state *state;
132         int i;
133         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
134         struct ctdb_db_context *ctdb_db;
135
136         ctdb_db = find_ctdb_db(ctdb, m->db_id);
137         if (ctdb_db == NULL) {
138                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
139                                  "Unknown database db_id[0x%08x]\n", m->db_id));
140                 return -1;
141         }
142
143         if (client == NULL) {
144                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
145                 return -1;
146         }
147
148         if (ctdb_db->unhealthy_reason) {
149                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
150                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
151                 return -1;
152         }
153
154         /* handling num_persistent_updates is a bit strange - 
155            there are 3 cases
156              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
157                 They don't expect num_persistent_updates to be used at all
158
159              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
160                 this commit to then decrement it
161
162              3) new clients which use TRANS2 commit functions, and
163                 expect this function to increment the counter, and
164                 then have it decremented in ctdb_control_trans2_error
165                 or ctdb_control_trans2_finished
166         */
167         switch (c->opcode) {
168         case CTDB_CONTROL_PERSISTENT_STORE:
169                 if (ctdb_db->transaction_active) {
170                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
171                                           "transaction is active on database "
172                                           "db_id[0x%08x] - refusing persistent "
173                                          " store for client id[0x%08x]\n",
174                                           ctdb_db->db_id, client->client_id));
175                         return -1;
176                 }
177                 if (client->num_persistent_updates > 0) {
178                         client->num_persistent_updates--;
179                 }
180                 break;
181         case CTDB_CONTROL_TRANS2_COMMIT:
182                 if (ctdb_db->transaction_active) {
183                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
184                                          " already a transaction commit "
185                                          "active on db_id[0x%08x] - forbidding "
186                                          "client_id[0x%08x] to commit\n",
187                                          ctdb_db->db_id, client->client_id));
188                         return -1;
189                 }
190                 if (client->db_id != 0) {
191                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
192                                          "client-db_id[0x%08x] != 0 "
193                                          "(client_id[0x%08x])\n",
194                                          client->db_id, client->client_id));
195                         return -1;
196                 }
197                 client->num_persistent_updates++;
198                 ctdb_db->transaction_active = true;
199                 client->db_id = m->db_id;
200                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
201                                   " commit transaction on db id[0x%08x]\n",
202                                   client->client_id, client->db_id));
203                 break;
204         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
205                 /* already updated from the first commit */
206                 if (client->db_id != m->db_id) {
207                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
208                                          "retry: client-db_id[0x%08x] != "
209                                          "db_id[0x%08x] (client_id[0x%08x])\n",
210                                          client->db_id,
211                                          m->db_id, client->client_id));
212                         return -1;
213                 }
214                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
215                                     "transaction commit retry on "
216                                     "db_id[0x%08x]\n",
217                                     client->client_id, client->db_id));
218                 break;
219         }
220
221         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
222                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
223                 return -1;
224         }
225
226         state = talloc_zero(ctdb, struct ctdb_persistent_state);
227         CTDB_NO_MEMORY(ctdb, state);
228
229         state->ctdb = ctdb;
230         state->c    = c;
231
232         for (i=0;i<ctdb->vnn_map->size;i++) {
233                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
234                 int ret;
235
236                 /* only send to active nodes */
237                 if (node->flags & NODE_FLAGS_INACTIVE) {
238                         continue;
239                 }
240
241                 /* don't send to ourselves */
242                 if (node->pnn == ctdb->pnn) {
243                         continue;
244                 }
245                 
246                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
247                                                c->client_id, 0, recdata, 
248                                                ctdb_persistent_callback, state);
249                 if (ret == -1) {
250                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
251                         talloc_free(state);
252                         return -1;
253                 }
254
255                 state->num_pending++;
256                 state->num_sent++;
257         }
258
259         if (state->num_pending == 0) {
260                 talloc_free(state);
261                 return 0;
262         }
263         
264         /* we need to wait for the replies */
265         *async_reply = true;
266
267         /* need to keep the control structure around */
268         talloc_steal(state, c);
269
270         /* but we won't wait forever */
271         event_add_timed(ctdb->ev, state, 
272                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
273                         ctdb_persistent_store_timeout, state);
274
275         return 0;
276 }
277
278 static int ctdb_persistent_state_destructor(struct ctdb_persistent_state *state)
279 {
280         if (state->ctdb_db != NULL) {
281                 state->ctdb_db->persistent_state = NULL;
282         }
283
284         return 0;
285 }
286
287 /*
288  * Store a set of persistent records.
289  * This is used to roll out a transaction to all nodes.
290  */
291 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
292                                    struct ctdb_req_control *c,
293                                    TDB_DATA recdata, bool *async_reply)
294 {
295         struct ctdb_client *client;
296         struct ctdb_persistent_state *state;
297         int i;
298         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
299         struct ctdb_db_context *ctdb_db;
300
301         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
302                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
303                 return -1;
304         }
305
306         ctdb_db = find_ctdb_db(ctdb, m->db_id);
307         if (ctdb_db == NULL) {
308                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
309                                  "Unknown database db_id[0x%08x]\n", m->db_id));
310                 return -1;
311         }
312
313         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
314         if (client == NULL) {
315                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
316                                  "to a client. Returning error\n"));
317                 return -1;
318         }
319
320         ctdb_db->persistent_state = talloc_zero(ctdb_db,
321                                                 struct ctdb_persistent_state);
322         CTDB_NO_MEMORY(ctdb, ctdb_db->persistent_state);
323
324         state = ctdb_db->persistent_state;
325         state->ctdb = ctdb;
326         state->ctdb_db = ctdb_db;
327         state->c    = c;
328         talloc_set_destructor(state, ctdb_persistent_state_destructor);
329
330         for (i = 0; i < ctdb->vnn_map->size; i++) {
331                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
332                 int ret;
333
334                 /* only send to active nodes */
335                 if (node->flags & NODE_FLAGS_INACTIVE) {
336                         continue;
337                 }
338
339                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
340                                                CTDB_CONTROL_UPDATE_RECORD,
341                                                c->client_id, 0, recdata,
342                                                ctdb_persistent_callback,
343                                                state);
344                 if (ret == -1) {
345                         DEBUG(DEBUG_ERR,("Unable to send "
346                                          "CTDB_CONTROL_UPDATE_RECORD "
347                                          "to pnn %u\n", node->pnn));
348                         talloc_free(state);
349                         return -1;
350                 }
351
352                 state->num_pending++;
353                 state->num_sent++;
354         }
355
356         if (state->num_pending == 0) {
357                 talloc_free(state);
358                 return 0;
359         }
360
361         /* we need to wait for the replies */
362         *async_reply = true;
363
364         /* need to keep the control structure around */
365         talloc_steal(state, c);
366
367         /* but we won't wait forever */
368         event_add_timed(ctdb->ev, state,
369                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
370                         ctdb_persistent_store_timeout, state);
371
372         return 0;
373 }
374
375
376 struct ctdb_persistent_write_state {
377         struct ctdb_db_context *ctdb_db;
378         struct ctdb_marshall_buffer *m;
379         struct ctdb_req_control *c;
380 };
381
382
383 /*
384   called from a child process to write the data
385  */
386 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
387 {
388         int ret, i;
389         struct ctdb_rec_data *rec = NULL;
390         struct ctdb_marshall_buffer *m = state->m;
391
392         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
393         if (ret == -1) {
394                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
395                                  state->ctdb_db->db_id));
396                 return -1;
397         }
398
399         for (i=0;i<m->count;i++) {
400                 struct ctdb_ltdb_header oldheader;
401                 struct ctdb_ltdb_header header;
402                 TDB_DATA key, data, olddata;
403                 TALLOC_CTX *tmp_ctx = talloc_new(state);
404
405                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
406                 
407                 if (rec == NULL) {
408                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
409                                          i, state->ctdb_db->db_id));
410                         talloc_free(tmp_ctx);
411                         goto failed;                    
412                 }
413
414                 /* fetch the old header and ensure the rsn is less than the new rsn */
415                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
416                 if (ret != 0) {
417                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
418                                          state->ctdb_db->db_id));
419                         talloc_free(tmp_ctx);
420                         goto failed;
421                 }
422
423                 if (oldheader.rsn >= header.rsn &&
424                     (olddata.dsize != data.dsize || 
425                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
426                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
427                                           state->ctdb_db->db_id, 
428                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
429                         talloc_free(tmp_ctx);
430                         goto failed;
431                 }
432
433                 talloc_free(tmp_ctx);
434
435                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
436                 if (ret != 0) {
437                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
438                                           state->ctdb_db->db_id));
439                         goto failed;
440                 }
441         }
442
443         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
444         if (ret == -1) {
445                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
446                                  state->ctdb_db->db_id));
447                 return -1;
448         }
449
450         return 0;
451         
452 failed:
453         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
454         return -1;
455 }
456
457
458 /*
459   called when we the child has completed the persistent write
460   on our behalf
461  */
462 static void ctdb_persistent_write_callback(int status, void *private_data)
463 {
464         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
465                                                                    struct ctdb_persistent_write_state);
466
467
468         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
469
470         talloc_free(state);
471 }
472
473 /*
474   called if our lockwait child times out
475  */
476 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
477                                          struct timeval t, void *private_data)
478 {
479         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
480                                                                    struct ctdb_persistent_write_state);
481         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
482         talloc_free(state);
483 }
484
485 struct childwrite_handle {
486         struct ctdb_context *ctdb;
487         struct ctdb_db_context *ctdb_db;
488         struct fd_event *fde;
489         int fd[2];
490         pid_t child;
491         void *private_data;
492         void (*callback)(int, void *);
493         struct timeval start_time;
494 };
495
496 static int childwrite_destructor(struct childwrite_handle *h)
497 {
498         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
499         kill(h->child, SIGKILL);
500         return 0;
501 }
502
503 /* called when the child process has finished writing the record to the
504    database
505 */
506 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
507                              uint16_t flags, void *private_data)
508 {
509         struct childwrite_handle *h = talloc_get_type(private_data, 
510                                                      struct childwrite_handle);
511         void *p = h->private_data;
512         void (*callback)(int, void *) = h->callback;
513         pid_t child = h->child;
514         TALLOC_CTX *tmp_ctx = talloc_new(ev);
515         int ret;
516         char c;
517
518         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
519         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
520
521         /* the handle needs to go away when the context is gone - when
522            the handle goes away this implicitly closes the pipe, which
523            kills the child */
524         talloc_steal(tmp_ctx, h);
525
526         talloc_set_destructor(h, NULL);
527
528         ret = read(h->fd[0], &c, 1);
529         if (ret < 1) {
530                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
531                 c = 1;
532         }
533
534         callback(c, p);
535
536         kill(child, SIGKILL);
537         talloc_free(tmp_ctx);
538 }
539
540 /* this creates a child process which will take out a tdb transaction
541    and write the record to the database.
542 */
543 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
544                                 void (*callback)(int, void *private_data),
545                                 struct ctdb_persistent_write_state *state)
546 {
547         struct childwrite_handle *result;
548         int ret;
549         pid_t parent = getpid();
550
551         CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
552         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
553
554         if (!(result = talloc_zero(state, struct childwrite_handle))) {
555                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
556                 return NULL;
557         }
558
559         ret = pipe(result->fd);
560
561         if (ret != 0) {
562                 talloc_free(result);
563                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
564                 return NULL;
565         }
566
567         result->child = ctdb_fork(ctdb_db->ctdb);
568
569         if (result->child == (pid_t)-1) {
570                 close(result->fd[0]);
571                 close(result->fd[1]);
572                 talloc_free(result);
573                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
574                 return NULL;
575         }
576
577         result->callback = callback;
578         result->private_data = state;
579         result->ctdb = ctdb_db->ctdb;
580         result->ctdb_db = ctdb_db;
581
582         if (result->child == 0) {
583                 char c = 0;
584
585                 close(result->fd[0]);
586                 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
587                 ret = ctdb_persistent_store(state);
588                 if (ret != 0) {
589                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
590                         c = 1;
591                 }
592
593                 write(result->fd[1], &c, 1);
594
595                 /* make sure we die when our parent dies */
596                 while (kill(parent, 0) == 0 || errno != ESRCH) {
597                         sleep(5);
598                 }
599                 _exit(0);
600         }
601
602         close(result->fd[1]);
603         set_close_on_exec(result->fd[0]);
604
605         talloc_set_destructor(result, childwrite_destructor);
606
607         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
608
609         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
610                                    EVENT_FD_READ, childwrite_handler,
611                                    (void *)result);
612         if (result->fde == NULL) {
613                 talloc_free(result);
614                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
615                 return NULL;
616         }
617         tevent_fd_set_auto_close(result->fde);
618
619         result->start_time = timeval_current();
620
621         return result;
622 }
623
624 /* 
625    update a record on this node if the new record has a higher rsn than the
626    current record
627  */
628 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
629                                    struct ctdb_req_control *c, TDB_DATA recdata, 
630                                    bool *async_reply)
631 {
632         struct ctdb_db_context *ctdb_db;
633         struct ctdb_persistent_write_state *state;
634         struct childwrite_handle *handle;
635         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
636
637         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
638                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
639                 return -1;
640         }
641
642         ctdb_db = find_ctdb_db(ctdb, m->db_id);
643         if (ctdb_db == NULL) {
644                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
645                 return -1;
646         }
647
648         if (ctdb_db->unhealthy_reason) {
649                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
650                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
651                 return -1;
652         }
653
654         state = talloc(ctdb, struct ctdb_persistent_write_state);
655         CTDB_NO_MEMORY(ctdb, state);
656
657         state->ctdb_db = ctdb_db;
658         state->c       = c;
659         state->m       = m;
660
661         /* create a child process to take out a transaction and 
662            write the data.
663         */
664         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
665         if (handle == NULL) {
666                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
667                 talloc_free(state);
668                 return -1;
669         }
670
671         /* we need to wait for the replies */
672         *async_reply = true;
673
674         /* need to keep the control structure around */
675         talloc_steal(state, c);
676
677         /* but we won't wait forever */
678         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
679                         ctdb_persistent_lock_timeout, state);
680
681         return 0;
682 }
683
684
685 /*
686   called when a client has finished a local commit in a transaction to 
687   a persistent database
688  */
689 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
690                                      struct ctdb_req_control *c)
691 {
692         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
693         struct ctdb_db_context *ctdb_db;
694
695         ctdb_db = find_ctdb_db(ctdb, client->db_id);
696         if (ctdb_db == NULL) {
697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
698                                  "Unknown database 0x%08x\n", client->db_id));
699                 return -1;
700         }
701         if (!ctdb_db->transaction_active) {
702                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
703                                  "Database 0x%08x has no transaction commit "
704                                  "started\n", client->db_id));
705                 return -1;
706         }
707
708         ctdb_db->transaction_active = false;
709         client->db_id = 0;
710
711         if (client->num_persistent_updates == 0) {
712                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
713                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
714                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
715                 return -1;
716         }
717         client->num_persistent_updates--;
718
719         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
720                             "transaction commit db_id[0x%08x]\n",
721                             client->client_id, ctdb_db->db_id));
722
723         return 0;
724 }
725
726 /*
727   called when a client gets an error committing its database
728   during a transaction commit
729  */
730 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
731                                   struct ctdb_req_control *c)
732 {
733         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
734         struct ctdb_db_context *ctdb_db;
735
736         ctdb_db = find_ctdb_db(ctdb, client->db_id);
737         if (ctdb_db == NULL) {
738                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
739                                  "Unknown database 0x%08x\n", client->db_id));
740                 return -1;
741         }
742         if (!ctdb_db->transaction_active) {
743                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
744                                  "Database 0x%08x has no transaction commit "
745                                  "started\n", client->db_id));
746                 return -1;
747         }
748
749         ctdb_db->transaction_active = false;
750         client->db_id = 0;
751
752         if (client->num_persistent_updates == 0) {
753                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
754         } else {
755                 client->num_persistent_updates--;
756         }
757
758         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
759                          " db_id[0x%08x] - forcing recovery\n",
760                          ctdb_db->db_id));
761         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
762
763         return 0;
764 }
765
766 /**
767  * Tell whether a transaction is active on this node on the give DB.
768  */
769 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
770                                    struct ctdb_req_control *c,
771                                    uint32_t db_id)
772 {
773         struct ctdb_db_context *ctdb_db;
774         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
775
776         ctdb_db = find_ctdb_db(ctdb, db_id);
777         if (!ctdb_db) {
778                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
779                 return -1;
780         }
781
782         if (client->db_id == db_id) {
783                 return 0;
784         }
785
786         if (ctdb_db->transaction_active) {
787                 return 1;
788         } else {
789                 return 0;
790         }
791 }
792
793 /*
794   backwards compatibility:
795
796   start a persistent store operation. passing both the key, header and
797   data to the daemon. If the client disconnects before it has issued
798   a persistent_update call to the daemon we trigger a full recovery
799   to ensure the databases are brought back in sync.
800   for now we ignore the recdata that the client has passed to us.
801  */
802 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
803                                       struct ctdb_req_control *c,
804                                       TDB_DATA recdata)
805 {
806         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
807
808         if (client == NULL) {
809                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
810                 return -1;
811         }
812
813         client->num_persistent_updates++;
814
815         return 0;
816 }
817
818 /* 
819   backwards compatibility:
820
821   called to tell ctdbd that it is no longer doing a persistent update 
822 */
823 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
824                                               struct ctdb_req_control *c,
825                                               TDB_DATA recdata)
826 {
827         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
828
829         if (client == NULL) {
830                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
831                 return -1;
832         }
833
834         if (client->num_persistent_updates > 0) {
835                 client->num_persistent_updates--;
836         }
837
838         return 0;
839 }
840
841
842 /*
843   backwards compatibility:
844
845   single record varient of ctdb_control_trans2_commit for older clients
846  */
847 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
848                                       struct ctdb_req_control *c, 
849                                       TDB_DATA recdata, bool *async_reply)
850 {
851         struct ctdb_marshall_buffer *m;
852         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
853         TDB_DATA key, data;
854
855         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
856             rec->keylen + rec->datalen) {
857                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
858                 return -1;
859         }
860
861         key.dptr = &rec->data[0];
862         key.dsize = rec->keylen;
863         data.dptr = &rec->data[rec->keylen];
864         data.dsize = rec->datalen;
865
866         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
867         CTDB_NO_MEMORY(ctdb, m);
868
869         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
870 }
871
872 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
873                                   uint32_t db_id,
874                                   uint64_t *seqnum)
875 {
876         int32_t ret;
877         struct ctdb_db_context *ctdb_db;
878         const char *keyname = CTDB_DB_SEQNUM_KEY;
879         TDB_DATA key;
880         TDB_DATA data;
881         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
882
883         ctdb_db = find_ctdb_db(ctdb, db_id);
884         if (!ctdb_db) {
885                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
886                 ret = -1;
887                 goto done;
888         }
889
890         key.dptr = (uint8_t *)discard_const(keyname);
891         key.dsize = strlen(keyname) + 1;
892
893         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
894         if (ret != 0) {
895                 goto done;
896         }
897
898         if (data.dsize != sizeof(uint64_t)) {
899                 *seqnum = 0;
900                 goto done;
901         }
902
903         *seqnum = *(uint64_t *)data.dptr;
904
905 done:
906         talloc_free(mem_ctx);
907         return ret;
908 }
909
910 /**
911  * Get the sequence number of a persistent database.
912  */
913 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
914                                    TDB_DATA indata,
915                                    TDB_DATA *outdata)
916 {
917         uint32_t db_id;
918         int32_t ret;
919         uint64_t seqnum;
920
921         db_id = *(uint32_t *)indata.dptr;
922         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
923         if (ret != 0) {
924                 goto done;
925         }
926
927         outdata->dsize = sizeof(uint64_t);
928         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
929         if (outdata->dptr == NULL) {
930                 ret = -1;
931                 goto done;
932         }
933
934         *(outdata->dptr) = seqnum;
935
936 done:
937         return ret;
938 }