c7a53ca066f56b029b25593edf6bdf8768ae1578
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
32         struct ctdb_req_control *c;
33         const char *errormsg;
34         uint32_t num_pending;
35         int32_t status;
36         uint32_t num_failed, num_sent;
37 };
38
39 /*
40   1) all nodes fail, and all nodes reply
41   2) some nodes fail, all nodes reply
42   3) some nodes timeout
43   4) all nodes succeed
44  */
45
46 /*
47   called when a node has acknowledged a ctdb_control_update_record call
48  */
49 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
50                                      int32_t status, TDB_DATA data, 
51                                      const char *errormsg,
52                                      void *private_data)
53 {
54         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
55                                                               struct ctdb_persistent_state);
56         enum ctdb_trans2_commit_error etype;
57
58         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
59                 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
60                                    "during recovery\n"));
61                 return;
62         }
63
64         if (status != 0) {
65                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
66                          status, errormsg?errormsg:"no error message given"));
67                 state->status = status;
68                 state->errormsg = errormsg;
69                 state->num_failed++;
70
71                 /*
72                  * If a node failed to complete the update_record control,
73                  * then either a recovery is already running or something
74                  * bad is going on. So trigger a recovery and let the
75                  * recovery finish the transaction, sending back the reply
76                  * for the trans3_commit control to the client.
77                  */
78                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
79                 return;
80         }
81
82         state->num_pending--;
83
84         if (state->num_pending != 0) {
85                 return;
86         }
87
88         if (state->num_failed == state->num_sent) {
89                 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
90         } else if (state->num_failed != 0) {
91                 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
92         } else {
93                 etype = CTDB_TRANS2_COMMIT_SUCCESS;
94         }
95
96         ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
97         talloc_free(state);
98 }
99
100 /*
101   called if persistent store times out
102  */
103 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
104                                          struct timeval t, void *private_data)
105 {
106         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
107
108         if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109                 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
110                                    "timeout during recovery\n"));
111                 return;
112         }
113
114         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
115                                    "timeout in ctdb_persistent_state");
116
117         talloc_free(state);
118 }
119
120 /*
121   store a set of persistent records - called from a ctdb client when it has updated
122   some records in a persistent database. The client will have the record
123   locked for the duration of this call. The client is the dmaster when 
124   this call is made
125  */
126 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
127                                    struct ctdb_req_control *c, 
128                                    TDB_DATA recdata, bool *async_reply)
129 {
130         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
131         struct ctdb_persistent_state *state;
132         int i;
133         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
134         struct ctdb_db_context *ctdb_db;
135
136         ctdb_db = find_ctdb_db(ctdb, m->db_id);
137         if (ctdb_db == NULL) {
138                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
139                                  "Unknown database db_id[0x%08x]\n", m->db_id));
140                 return -1;
141         }
142
143         if (client == NULL) {
144                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
145                 return -1;
146         }
147
148         if (ctdb_db->unhealthy_reason) {
149                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
150                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
151                 return -1;
152         }
153
154         /* handling num_persistent_updates is a bit strange - 
155            there are 3 cases
156              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
157                 They don't expect num_persistent_updates to be used at all
158
159              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
160                 this commit to then decrement it
161
162              3) new clients which use TRANS2 commit functions, and
163                 expect this function to increment the counter, and
164                 then have it decremented in ctdb_control_trans2_error
165                 or ctdb_control_trans2_finished
166         */
167         switch (c->opcode) {
168         case CTDB_CONTROL_PERSISTENT_STORE:
169                 if (ctdb_db->transaction_active) {
170                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
171                                           "transaction is active on database "
172                                           "db_id[0x%08x] - refusing persistent "
173                                          " store for client id[0x%08x]\n",
174                                           ctdb_db->db_id, client->client_id));
175                         return -1;
176                 }
177                 if (client->num_persistent_updates > 0) {
178                         client->num_persistent_updates--;
179                 }
180                 break;
181         case CTDB_CONTROL_TRANS2_COMMIT:
182                 if (ctdb_db->transaction_active) {
183                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
184                                          " already a transaction commit "
185                                          "active on db_id[0x%08x] - forbidding "
186                                          "client_id[0x%08x] to commit\n",
187                                          ctdb_db->db_id, client->client_id));
188                         return -1;
189                 }
190                 if (client->db_id != 0) {
191                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
192                                          "client-db_id[0x%08x] != 0 "
193                                          "(client_id[0x%08x])\n",
194                                          client->db_id, client->client_id));
195                         return -1;
196                 }
197                 client->num_persistent_updates++;
198                 ctdb_db->transaction_active = true;
199                 client->db_id = m->db_id;
200                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
201                                   " commit transaction on db id[0x%08x]\n",
202                                   client->client_id, client->db_id));
203                 break;
204         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
205                 /* already updated from the first commit */
206                 if (client->db_id != m->db_id) {
207                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
208                                          "retry: client-db_id[0x%08x] != "
209                                          "db_id[0x%08x] (client_id[0x%08x])\n",
210                                          client->db_id,
211                                          m->db_id, client->client_id));
212                         return -1;
213                 }
214                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
215                                     "transaction commit retry on "
216                                     "db_id[0x%08x]\n",
217                                     client->client_id, client->db_id));
218                 break;
219         }
220
221         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
222                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
223                 return -1;
224         }
225
226         state = talloc_zero(ctdb, struct ctdb_persistent_state);
227         CTDB_NO_MEMORY(ctdb, state);
228
229         state->ctdb = ctdb;
230         state->c    = c;
231
232         for (i=0;i<ctdb->vnn_map->size;i++) {
233                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
234                 int ret;
235
236                 /* only send to active nodes */
237                 if (node->flags & NODE_FLAGS_INACTIVE) {
238                         continue;
239                 }
240
241                 /* don't send to ourselves */
242                 if (node->pnn == ctdb->pnn) {
243                         continue;
244                 }
245                 
246                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
247                                                c->client_id, 0, recdata, 
248                                                ctdb_persistent_callback, state);
249                 if (ret == -1) {
250                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
251                         talloc_free(state);
252                         return -1;
253                 }
254
255                 state->num_pending++;
256                 state->num_sent++;
257         }
258
259         if (state->num_pending == 0) {
260                 talloc_free(state);
261                 return 0;
262         }
263         
264         /* we need to wait for the replies */
265         *async_reply = true;
266
267         /* need to keep the control structure around */
268         talloc_steal(state, c);
269
270         /* but we won't wait forever */
271         event_add_timed(ctdb->ev, state, 
272                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
273                         ctdb_persistent_store_timeout, state);
274
275         return 0;
276 }
277
278 static int ctdb_persistent_state_destructor(struct ctdb_persistent_state *state)
279 {
280         if (state->ctdb_db != NULL) {
281                 state->ctdb_db->persistent_state = NULL;
282         }
283
284         return 0;
285 }
286
287 /*
288  * Store a set of persistent records.
289  * This is used to roll out a transaction to all nodes.
290  */
291 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
292                                    struct ctdb_req_control *c,
293                                    TDB_DATA recdata, bool *async_reply)
294 {
295         struct ctdb_client *client;
296         struct ctdb_persistent_state *state;
297         int i;
298         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
299         struct ctdb_db_context *ctdb_db;
300
301         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
302                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
303                 return -1;
304         }
305
306         ctdb_db = find_ctdb_db(ctdb, m->db_id);
307         if (ctdb_db == NULL) {
308                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
309                                  "Unknown database db_id[0x%08x]\n", m->db_id));
310                 return -1;
311         }
312
313         if (ctdb_db->persistent_state != NULL) {
314                 DEBUG(DEBUG_ERR, (__location__ " Error: "
315                                   "ctdb_control_trans3_commit "
316                                   "called while a transaction commit is "
317                                   "active. db_id[0x%08x]\n", m->db_id));
318                 return -1;
319         }
320
321         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
322         if (client == NULL) {
323                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
324                                  "to a client. Returning error\n"));
325                 return -1;
326         }
327
328         ctdb_db->persistent_state = talloc_zero(ctdb_db,
329                                                 struct ctdb_persistent_state);
330         CTDB_NO_MEMORY(ctdb, ctdb_db->persistent_state);
331
332         state = ctdb_db->persistent_state;
333         state->ctdb = ctdb;
334         state->ctdb_db = ctdb_db;
335         state->c    = c;
336         talloc_set_destructor(state, ctdb_persistent_state_destructor);
337
338         for (i = 0; i < ctdb->vnn_map->size; i++) {
339                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
340                 int ret;
341
342                 /* only send to active nodes */
343                 if (node->flags & NODE_FLAGS_INACTIVE) {
344                         continue;
345                 }
346
347                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
348                                                CTDB_CONTROL_UPDATE_RECORD,
349                                                c->client_id, 0, recdata,
350                                                ctdb_persistent_callback,
351                                                state);
352                 if (ret == -1) {
353                         DEBUG(DEBUG_ERR,("Unable to send "
354                                          "CTDB_CONTROL_UPDATE_RECORD "
355                                          "to pnn %u\n", node->pnn));
356                         talloc_free(state);
357                         return -1;
358                 }
359
360                 state->num_pending++;
361                 state->num_sent++;
362         }
363
364         if (state->num_pending == 0) {
365                 talloc_free(state);
366                 return 0;
367         }
368
369         /* we need to wait for the replies */
370         *async_reply = true;
371
372         /* need to keep the control structure around */
373         talloc_steal(state, c);
374
375         /* but we won't wait forever */
376         event_add_timed(ctdb->ev, state,
377                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
378                         ctdb_persistent_store_timeout, state);
379
380         return 0;
381 }
382
383
384 struct ctdb_persistent_write_state {
385         struct ctdb_db_context *ctdb_db;
386         struct ctdb_marshall_buffer *m;
387         struct ctdb_req_control *c;
388 };
389
390
391 /*
392   called from a child process to write the data
393  */
394 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
395 {
396         int ret, i;
397         struct ctdb_rec_data *rec = NULL;
398         struct ctdb_marshall_buffer *m = state->m;
399
400         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
401         if (ret == -1) {
402                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
403                                  state->ctdb_db->db_id));
404                 return -1;
405         }
406
407         for (i=0;i<m->count;i++) {
408                 struct ctdb_ltdb_header oldheader;
409                 struct ctdb_ltdb_header header;
410                 TDB_DATA key, data, olddata;
411                 TALLOC_CTX *tmp_ctx = talloc_new(state);
412
413                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
414                 
415                 if (rec == NULL) {
416                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
417                                          i, state->ctdb_db->db_id));
418                         talloc_free(tmp_ctx);
419                         goto failed;                    
420                 }
421
422                 /* fetch the old header and ensure the rsn is less than the new rsn */
423                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
424                 if (ret != 0) {
425                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
426                                          state->ctdb_db->db_id));
427                         talloc_free(tmp_ctx);
428                         goto failed;
429                 }
430
431                 if (oldheader.rsn >= header.rsn &&
432                     (olddata.dsize != data.dsize || 
433                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
434                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
435                                           state->ctdb_db->db_id, 
436                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
437                         talloc_free(tmp_ctx);
438                         goto failed;
439                 }
440
441                 talloc_free(tmp_ctx);
442
443                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
444                 if (ret != 0) {
445                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
446                                           state->ctdb_db->db_id));
447                         goto failed;
448                 }
449         }
450
451         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
452         if (ret == -1) {
453                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
454                                  state->ctdb_db->db_id));
455                 return -1;
456         }
457
458         return 0;
459         
460 failed:
461         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
462         return -1;
463 }
464
465
466 /*
467   called when we the child has completed the persistent write
468   on our behalf
469  */
470 static void ctdb_persistent_write_callback(int status, void *private_data)
471 {
472         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
473                                                                    struct ctdb_persistent_write_state);
474
475
476         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
477
478         talloc_free(state);
479 }
480
481 /*
482   called if our lockwait child times out
483  */
484 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
485                                          struct timeval t, void *private_data)
486 {
487         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
488                                                                    struct ctdb_persistent_write_state);
489         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
490         talloc_free(state);
491 }
492
493 struct childwrite_handle {
494         struct ctdb_context *ctdb;
495         struct ctdb_db_context *ctdb_db;
496         struct fd_event *fde;
497         int fd[2];
498         pid_t child;
499         void *private_data;
500         void (*callback)(int, void *);
501         struct timeval start_time;
502 };
503
504 static int childwrite_destructor(struct childwrite_handle *h)
505 {
506         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
507         kill(h->child, SIGKILL);
508         return 0;
509 }
510
511 /* called when the child process has finished writing the record to the
512    database
513 */
514 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
515                              uint16_t flags, void *private_data)
516 {
517         struct childwrite_handle *h = talloc_get_type(private_data, 
518                                                      struct childwrite_handle);
519         void *p = h->private_data;
520         void (*callback)(int, void *) = h->callback;
521         pid_t child = h->child;
522         TALLOC_CTX *tmp_ctx = talloc_new(ev);
523         int ret;
524         char c;
525
526         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
527         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
528
529         /* the handle needs to go away when the context is gone - when
530            the handle goes away this implicitly closes the pipe, which
531            kills the child */
532         talloc_steal(tmp_ctx, h);
533
534         talloc_set_destructor(h, NULL);
535
536         ret = read(h->fd[0], &c, 1);
537         if (ret < 1) {
538                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
539                 c = 1;
540         }
541
542         callback(c, p);
543
544         kill(child, SIGKILL);
545         talloc_free(tmp_ctx);
546 }
547
548 /* this creates a child process which will take out a tdb transaction
549    and write the record to the database.
550 */
551 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
552                                 void (*callback)(int, void *private_data),
553                                 struct ctdb_persistent_write_state *state)
554 {
555         struct childwrite_handle *result;
556         int ret;
557         pid_t parent = getpid();
558
559         CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
560         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
561
562         if (!(result = talloc_zero(state, struct childwrite_handle))) {
563                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
564                 return NULL;
565         }
566
567         ret = pipe(result->fd);
568
569         if (ret != 0) {
570                 talloc_free(result);
571                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
572                 return NULL;
573         }
574
575         result->child = ctdb_fork(ctdb_db->ctdb);
576
577         if (result->child == (pid_t)-1) {
578                 close(result->fd[0]);
579                 close(result->fd[1]);
580                 talloc_free(result);
581                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
582                 return NULL;
583         }
584
585         result->callback = callback;
586         result->private_data = state;
587         result->ctdb = ctdb_db->ctdb;
588         result->ctdb_db = ctdb_db;
589
590         if (result->child == 0) {
591                 char c = 0;
592
593                 close(result->fd[0]);
594                 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
595                 ret = ctdb_persistent_store(state);
596                 if (ret != 0) {
597                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
598                         c = 1;
599                 }
600
601                 write(result->fd[1], &c, 1);
602
603                 /* make sure we die when our parent dies */
604                 while (kill(parent, 0) == 0 || errno != ESRCH) {
605                         sleep(5);
606                 }
607                 _exit(0);
608         }
609
610         close(result->fd[1]);
611         set_close_on_exec(result->fd[0]);
612
613         talloc_set_destructor(result, childwrite_destructor);
614
615         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
616
617         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
618                                    EVENT_FD_READ, childwrite_handler,
619                                    (void *)result);
620         if (result->fde == NULL) {
621                 talloc_free(result);
622                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
623                 return NULL;
624         }
625         tevent_fd_set_auto_close(result->fde);
626
627         result->start_time = timeval_current();
628
629         return result;
630 }
631
632 /* 
633    update a record on this node if the new record has a higher rsn than the
634    current record
635  */
636 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
637                                    struct ctdb_req_control *c, TDB_DATA recdata, 
638                                    bool *async_reply)
639 {
640         struct ctdb_db_context *ctdb_db;
641         struct ctdb_persistent_write_state *state;
642         struct childwrite_handle *handle;
643         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
644
645         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
646                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
647                 return -1;
648         }
649
650         ctdb_db = find_ctdb_db(ctdb, m->db_id);
651         if (ctdb_db == NULL) {
652                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
653                 return -1;
654         }
655
656         if (ctdb_db->unhealthy_reason) {
657                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
658                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
659                 return -1;
660         }
661
662         state = talloc(ctdb, struct ctdb_persistent_write_state);
663         CTDB_NO_MEMORY(ctdb, state);
664
665         state->ctdb_db = ctdb_db;
666         state->c       = c;
667         state->m       = m;
668
669         /* create a child process to take out a transaction and 
670            write the data.
671         */
672         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
673         if (handle == NULL) {
674                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
675                 talloc_free(state);
676                 return -1;
677         }
678
679         /* we need to wait for the replies */
680         *async_reply = true;
681
682         /* need to keep the control structure around */
683         talloc_steal(state, c);
684
685         /* but we won't wait forever */
686         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
687                         ctdb_persistent_lock_timeout, state);
688
689         return 0;
690 }
691
692
693 /*
694   called when a client has finished a local commit in a transaction to 
695   a persistent database
696  */
697 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
698                                      struct ctdb_req_control *c)
699 {
700         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
701         struct ctdb_db_context *ctdb_db;
702
703         ctdb_db = find_ctdb_db(ctdb, client->db_id);
704         if (ctdb_db == NULL) {
705                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
706                                  "Unknown database 0x%08x\n", client->db_id));
707                 return -1;
708         }
709         if (!ctdb_db->transaction_active) {
710                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
711                                  "Database 0x%08x has no transaction commit "
712                                  "started\n", client->db_id));
713                 return -1;
714         }
715
716         ctdb_db->transaction_active = false;
717         client->db_id = 0;
718
719         if (client->num_persistent_updates == 0) {
720                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
721                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
722                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
723                 return -1;
724         }
725         client->num_persistent_updates--;
726
727         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
728                             "transaction commit db_id[0x%08x]\n",
729                             client->client_id, ctdb_db->db_id));
730
731         return 0;
732 }
733
734 /*
735   called when a client gets an error committing its database
736   during a transaction commit
737  */
738 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
739                                   struct ctdb_req_control *c)
740 {
741         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
742         struct ctdb_db_context *ctdb_db;
743
744         ctdb_db = find_ctdb_db(ctdb, client->db_id);
745         if (ctdb_db == NULL) {
746                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
747                                  "Unknown database 0x%08x\n", client->db_id));
748                 return -1;
749         }
750         if (!ctdb_db->transaction_active) {
751                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
752                                  "Database 0x%08x has no transaction commit "
753                                  "started\n", client->db_id));
754                 return -1;
755         }
756
757         ctdb_db->transaction_active = false;
758         client->db_id = 0;
759
760         if (client->num_persistent_updates == 0) {
761                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
762         } else {
763                 client->num_persistent_updates--;
764         }
765
766         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
767                          " db_id[0x%08x] - forcing recovery\n",
768                          ctdb_db->db_id));
769         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
770
771         return 0;
772 }
773
774 /**
775  * Tell whether a transaction is active on this node on the give DB.
776  */
777 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
778                                    struct ctdb_req_control *c,
779                                    uint32_t db_id)
780 {
781         struct ctdb_db_context *ctdb_db;
782         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
783
784         ctdb_db = find_ctdb_db(ctdb, db_id);
785         if (!ctdb_db) {
786                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
787                 return -1;
788         }
789
790         if (client->db_id == db_id) {
791                 return 0;
792         }
793
794         if (ctdb_db->transaction_active) {
795                 return 1;
796         } else {
797                 return 0;
798         }
799 }
800
801 /*
802   backwards compatibility:
803
804   start a persistent store operation. passing both the key, header and
805   data to the daemon. If the client disconnects before it has issued
806   a persistent_update call to the daemon we trigger a full recovery
807   to ensure the databases are brought back in sync.
808   for now we ignore the recdata that the client has passed to us.
809  */
810 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
811                                       struct ctdb_req_control *c,
812                                       TDB_DATA recdata)
813 {
814         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
815
816         if (client == NULL) {
817                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
818                 return -1;
819         }
820
821         client->num_persistent_updates++;
822
823         return 0;
824 }
825
826 /* 
827   backwards compatibility:
828
829   called to tell ctdbd that it is no longer doing a persistent update 
830 */
831 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
832                                               struct ctdb_req_control *c,
833                                               TDB_DATA recdata)
834 {
835         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
836
837         if (client == NULL) {
838                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
839                 return -1;
840         }
841
842         if (client->num_persistent_updates > 0) {
843                 client->num_persistent_updates--;
844         }
845
846         return 0;
847 }
848
849
850 /*
851   backwards compatibility:
852
853   single record varient of ctdb_control_trans2_commit for older clients
854  */
855 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
856                                       struct ctdb_req_control *c, 
857                                       TDB_DATA recdata, bool *async_reply)
858 {
859         struct ctdb_marshall_buffer *m;
860         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
861         TDB_DATA key, data;
862
863         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
864             rec->keylen + rec->datalen) {
865                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
866                 return -1;
867         }
868
869         key.dptr = &rec->data[0];
870         key.dsize = rec->keylen;
871         data.dptr = &rec->data[rec->keylen];
872         data.dsize = rec->datalen;
873
874         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
875         CTDB_NO_MEMORY(ctdb, m);
876
877         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
878 }
879
880 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
881                                   uint32_t db_id,
882                                   uint64_t *seqnum)
883 {
884         int32_t ret;
885         struct ctdb_db_context *ctdb_db;
886         const char *keyname = CTDB_DB_SEQNUM_KEY;
887         TDB_DATA key;
888         TDB_DATA data;
889         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
890
891         ctdb_db = find_ctdb_db(ctdb, db_id);
892         if (!ctdb_db) {
893                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
894                 ret = -1;
895                 goto done;
896         }
897
898         key.dptr = (uint8_t *)discard_const(keyname);
899         key.dsize = strlen(keyname) + 1;
900
901         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
902         if (ret != 0) {
903                 goto done;
904         }
905
906         if (data.dsize != sizeof(uint64_t)) {
907                 *seqnum = 0;
908                 goto done;
909         }
910
911         *seqnum = *(uint64_t *)data.dptr;
912
913 done:
914         talloc_free(mem_ctx);
915         return ret;
916 }
917
918 /**
919  * Get the sequence number of a persistent database.
920  */
921 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
922                                    TDB_DATA indata,
923                                    TDB_DATA *outdata)
924 {
925         uint32_t db_id;
926         int32_t ret;
927         uint64_t seqnum;
928
929         db_id = *(uint32_t *)indata.dptr;
930         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
931         if (ret != 0) {
932                 goto done;
933         }
934
935         outdata->dsize = sizeof(uint64_t);
936         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
937         if (outdata->dptr == NULL) {
938                 ret = -1;
939                 goto done;
940         }
941
942         *(outdata->dptr) = seqnum;
943
944 done:
945         return ret;
946 }