0627037ea9246ef2dbe70df3de243aa0355a9451
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
32         struct ctdb_client *client; /* used by trans3_commit */
33         struct ctdb_req_control *c;
34         const char *errormsg;
35         uint32_t num_pending;
36         int32_t status;
37         uint32_t num_failed, num_sent;
38 };
39
40 /*
41   1) all nodes fail, and all nodes reply
42   2) some nodes fail, all nodes reply
43   3) some nodes timeout
44   4) all nodes succeed
45  */
46
47 /*
48   called when a node has acknowledged a ctdb_control_update_record call
49  */
50 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
51                                      int32_t status, TDB_DATA data, 
52                                      const char *errormsg,
53                                      void *private_data)
54 {
55         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
56                                                               struct ctdb_persistent_state);
57         enum ctdb_trans2_commit_error etype;
58
59         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
60                 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
61                                    "during recovery\n"));
62                 return;
63         }
64
65         if (status != 0) {
66                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
67                          status, errormsg?errormsg:"no error message given"));
68                 state->status = status;
69                 state->errormsg = errormsg;
70                 state->num_failed++;
71
72                 /*
73                  * If a node failed to complete the update_record control,
74                  * then either a recovery is already running or something
75                  * bad is going on. So trigger a recovery and let the
76                  * recovery finish the transaction, sending back the reply
77                  * for the trans3_commit control to the client.
78                  */
79                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
80                 return;
81         }
82
83         state->num_pending--;
84
85         if (state->num_pending != 0) {
86                 return;
87         }
88
89         if (state->num_failed == state->num_sent) {
90                 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
91         } else if (state->num_failed != 0) {
92                 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
93         } else {
94                 etype = CTDB_TRANS2_COMMIT_SUCCESS;
95         }
96
97         ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
98         talloc_free(state);
99 }
100
101 /*
102   called if persistent store times out
103  */
104 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
105                                          struct timeval t, void *private_data)
106 {
107         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
108
109         if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
110                 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
111                                    "timeout during recovery\n"));
112                 return;
113         }
114
115         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
116                                    "timeout in ctdb_persistent_state");
117
118         talloc_free(state);
119 }
120
121 /*
122   store a set of persistent records - called from a ctdb client when it has updated
123   some records in a persistent database. The client will have the record
124   locked for the duration of this call. The client is the dmaster when 
125   this call is made
126  */
127 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
128                                    struct ctdb_req_control *c, 
129                                    TDB_DATA recdata, bool *async_reply)
130 {
131         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
132         struct ctdb_persistent_state *state;
133         int i;
134         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
135         struct ctdb_db_context *ctdb_db;
136
137         ctdb_db = find_ctdb_db(ctdb, m->db_id);
138         if (ctdb_db == NULL) {
139                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
140                                  "Unknown database db_id[0x%08x]\n", m->db_id));
141                 return -1;
142         }
143
144         if (client == NULL) {
145                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
146                 return -1;
147         }
148
149         if (ctdb_db->unhealthy_reason) {
150                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
151                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
152                 return -1;
153         }
154
155         /* handling num_persistent_updates is a bit strange - 
156            there are 3 cases
157              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
158                 They don't expect num_persistent_updates to be used at all
159
160              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
161                 this commit to then decrement it
162
163              3) new clients which use TRANS2 commit functions, and
164                 expect this function to increment the counter, and
165                 then have it decremented in ctdb_control_trans2_error
166                 or ctdb_control_trans2_finished
167         */
168         switch (c->opcode) {
169         case CTDB_CONTROL_PERSISTENT_STORE:
170                 if (ctdb_db->transaction_active) {
171                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
172                                           "transaction is active on database "
173                                           "db_id[0x%08x] - refusing persistent "
174                                          " store for client id[0x%08x]\n",
175                                           ctdb_db->db_id, client->client_id));
176                         return -1;
177                 }
178                 if (client->num_persistent_updates > 0) {
179                         client->num_persistent_updates--;
180                 }
181                 break;
182         case CTDB_CONTROL_TRANS2_COMMIT:
183                 if (ctdb_db->transaction_active) {
184                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
185                                          " already a transaction commit "
186                                          "active on db_id[0x%08x] - forbidding "
187                                          "client_id[0x%08x] to commit\n",
188                                          ctdb_db->db_id, client->client_id));
189                         return -1;
190                 }
191                 if (client->db_id != 0) {
192                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
193                                          "client-db_id[0x%08x] != 0 "
194                                          "(client_id[0x%08x])\n",
195                                          client->db_id, client->client_id));
196                         return -1;
197                 }
198                 client->num_persistent_updates++;
199                 ctdb_db->transaction_active = true;
200                 client->db_id = m->db_id;
201                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
202                                   " commit transaction on db id[0x%08x]\n",
203                                   client->client_id, client->db_id));
204                 break;
205         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
206                 /* already updated from the first commit */
207                 if (client->db_id != m->db_id) {
208                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
209                                          "retry: client-db_id[0x%08x] != "
210                                          "db_id[0x%08x] (client_id[0x%08x])\n",
211                                          client->db_id,
212                                          m->db_id, client->client_id));
213                         return -1;
214                 }
215                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
216                                     "transaction commit retry on "
217                                     "db_id[0x%08x]\n",
218                                     client->client_id, client->db_id));
219                 break;
220         }
221
222         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
223                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
224                 return -1;
225         }
226
227         state = talloc_zero(ctdb, struct ctdb_persistent_state);
228         CTDB_NO_MEMORY(ctdb, state);
229
230         state->ctdb = ctdb;
231         state->c    = c;
232
233         for (i=0;i<ctdb->vnn_map->size;i++) {
234                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
235                 int ret;
236
237                 /* only send to active nodes */
238                 if (node->flags & NODE_FLAGS_INACTIVE) {
239                         continue;
240                 }
241
242                 /* don't send to ourselves */
243                 if (node->pnn == ctdb->pnn) {
244                         continue;
245                 }
246                 
247                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
248                                                c->client_id, 0, recdata, 
249                                                ctdb_persistent_callback, state);
250                 if (ret == -1) {
251                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
252                         talloc_free(state);
253                         return -1;
254                 }
255
256                 state->num_pending++;
257                 state->num_sent++;
258         }
259
260         if (state->num_pending == 0) {
261                 talloc_free(state);
262                 return 0;
263         }
264         
265         /* we need to wait for the replies */
266         *async_reply = true;
267
268         /* need to keep the control structure around */
269         talloc_steal(state, c);
270
271         /* but we won't wait forever */
272         event_add_timed(ctdb->ev, state, 
273                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
274                         ctdb_persistent_store_timeout, state);
275
276         return 0;
277 }
278
279 static int ctdb_persistent_state_destructor(struct ctdb_persistent_state *state)
280 {
281         if (state->client != NULL) {
282                 state->client->db_id = 0;
283         }
284
285         if (state->ctdb_db != NULL) {
286                 state->ctdb_db->persistent_state = NULL;
287         }
288
289         return 0;
290 }
291
292 /*
293  * Store a set of persistent records.
294  * This is used to roll out a transaction to all nodes.
295  */
296 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
297                                    struct ctdb_req_control *c,
298                                    TDB_DATA recdata, bool *async_reply)
299 {
300         struct ctdb_client *client;
301         struct ctdb_persistent_state *state;
302         int i;
303         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
304         struct ctdb_db_context *ctdb_db;
305
306         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
307                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
308                 return -1;
309         }
310
311         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
312         if (client == NULL) {
313                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
314                                  "to a client. Returning error\n"));
315                 return -1;
316         }
317
318         if (client->db_id != 0) {
319                 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans3_commit: "
320                                  "client-db_id[0x%08x] != 0 "
321                                  "(client_id[0x%08x]): trans3_commit active?\n",
322                                  client->db_id, client->client_id));
323                 return -1;
324         }
325
326         ctdb_db = find_ctdb_db(ctdb, m->db_id);
327         if (ctdb_db == NULL) {
328                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
329                                  "Unknown database db_id[0x%08x]\n", m->db_id));
330                 return -1;
331         }
332
333         if (ctdb_db->persistent_state != NULL) {
334                 DEBUG(DEBUG_ERR, (__location__ " Error: "
335                                   "ctdb_control_trans3_commit "
336                                   "called while a transaction commit is "
337                                   "active. db_id[0x%08x]\n", m->db_id));
338                 return -1;
339         }
340
341         ctdb_db->persistent_state = talloc_zero(ctdb_db,
342                                                 struct ctdb_persistent_state);
343         CTDB_NO_MEMORY(ctdb, ctdb_db->persistent_state);
344
345         client->db_id = m->db_id;
346
347         state = ctdb_db->persistent_state;
348         state->ctdb = ctdb;
349         state->ctdb_db = ctdb_db;
350         state->c    = c;
351         state->client = client;
352
353         talloc_set_destructor(state, ctdb_persistent_state_destructor);
354
355         for (i = 0; i < ctdb->vnn_map->size; i++) {
356                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
357                 int ret;
358
359                 /* only send to active nodes */
360                 if (node->flags & NODE_FLAGS_INACTIVE) {
361                         continue;
362                 }
363
364                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
365                                                CTDB_CONTROL_UPDATE_RECORD,
366                                                c->client_id, 0, recdata,
367                                                ctdb_persistent_callback,
368                                                state);
369                 if (ret == -1) {
370                         DEBUG(DEBUG_ERR,("Unable to send "
371                                          "CTDB_CONTROL_UPDATE_RECORD "
372                                          "to pnn %u\n", node->pnn));
373                         talloc_free(state);
374                         return -1;
375                 }
376
377                 state->num_pending++;
378                 state->num_sent++;
379         }
380
381         if (state->num_pending == 0) {
382                 talloc_free(state);
383                 return 0;
384         }
385
386         /* we need to wait for the replies */
387         *async_reply = true;
388
389         /* need to keep the control structure around */
390         talloc_steal(state, c);
391
392         /* but we won't wait forever */
393         event_add_timed(ctdb->ev, state,
394                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
395                         ctdb_persistent_store_timeout, state);
396
397         return 0;
398 }
399
400
401 struct ctdb_persistent_write_state {
402         struct ctdb_db_context *ctdb_db;
403         struct ctdb_marshall_buffer *m;
404         struct ctdb_req_control *c;
405 };
406
407
408 /*
409   called from a child process to write the data
410  */
411 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
412 {
413         int ret, i;
414         struct ctdb_rec_data *rec = NULL;
415         struct ctdb_marshall_buffer *m = state->m;
416
417         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
418         if (ret == -1) {
419                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
420                                  state->ctdb_db->db_id));
421                 return -1;
422         }
423
424         for (i=0;i<m->count;i++) {
425                 struct ctdb_ltdb_header oldheader;
426                 struct ctdb_ltdb_header header;
427                 TDB_DATA key, data, olddata;
428                 TALLOC_CTX *tmp_ctx = talloc_new(state);
429
430                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
431                 
432                 if (rec == NULL) {
433                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
434                                          i, state->ctdb_db->db_id));
435                         talloc_free(tmp_ctx);
436                         goto failed;                    
437                 }
438
439                 /* fetch the old header and ensure the rsn is less than the new rsn */
440                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
441                 if (ret != 0) {
442                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
443                                          state->ctdb_db->db_id));
444                         talloc_free(tmp_ctx);
445                         goto failed;
446                 }
447
448                 if (oldheader.rsn >= header.rsn &&
449                     (olddata.dsize != data.dsize || 
450                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
451                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
452                                           state->ctdb_db->db_id, 
453                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
454                         talloc_free(tmp_ctx);
455                         goto failed;
456                 }
457
458                 talloc_free(tmp_ctx);
459
460                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
461                 if (ret != 0) {
462                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
463                                           state->ctdb_db->db_id));
464                         goto failed;
465                 }
466         }
467
468         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
469         if (ret == -1) {
470                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
471                                  state->ctdb_db->db_id));
472                 return -1;
473         }
474
475         return 0;
476         
477 failed:
478         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
479         return -1;
480 }
481
482
483 /*
484   called when we the child has completed the persistent write
485   on our behalf
486  */
487 static void ctdb_persistent_write_callback(int status, void *private_data)
488 {
489         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
490                                                                    struct ctdb_persistent_write_state);
491
492
493         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
494
495         talloc_free(state);
496 }
497
498 /*
499   called if our lockwait child times out
500  */
501 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
502                                          struct timeval t, void *private_data)
503 {
504         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
505                                                                    struct ctdb_persistent_write_state);
506         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
507         talloc_free(state);
508 }
509
510 struct childwrite_handle {
511         struct ctdb_context *ctdb;
512         struct ctdb_db_context *ctdb_db;
513         struct fd_event *fde;
514         int fd[2];
515         pid_t child;
516         void *private_data;
517         void (*callback)(int, void *);
518         struct timeval start_time;
519 };
520
521 static int childwrite_destructor(struct childwrite_handle *h)
522 {
523         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
524         kill(h->child, SIGKILL);
525         return 0;
526 }
527
528 /* called when the child process has finished writing the record to the
529    database
530 */
531 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
532                              uint16_t flags, void *private_data)
533 {
534         struct childwrite_handle *h = talloc_get_type(private_data, 
535                                                      struct childwrite_handle);
536         void *p = h->private_data;
537         void (*callback)(int, void *) = h->callback;
538         pid_t child = h->child;
539         TALLOC_CTX *tmp_ctx = talloc_new(ev);
540         int ret;
541         char c;
542
543         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
544         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
545
546         /* the handle needs to go away when the context is gone - when
547            the handle goes away this implicitly closes the pipe, which
548            kills the child */
549         talloc_steal(tmp_ctx, h);
550
551         talloc_set_destructor(h, NULL);
552
553         ret = read(h->fd[0], &c, 1);
554         if (ret < 1) {
555                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
556                 c = 1;
557         }
558
559         callback(c, p);
560
561         kill(child, SIGKILL);
562         talloc_free(tmp_ctx);
563 }
564
565 /* this creates a child process which will take out a tdb transaction
566    and write the record to the database.
567 */
568 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
569                                 void (*callback)(int, void *private_data),
570                                 struct ctdb_persistent_write_state *state)
571 {
572         struct childwrite_handle *result;
573         int ret;
574         pid_t parent = getpid();
575
576         CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
577         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
578
579         if (!(result = talloc_zero(state, struct childwrite_handle))) {
580                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
581                 return NULL;
582         }
583
584         ret = pipe(result->fd);
585
586         if (ret != 0) {
587                 talloc_free(result);
588                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
589                 return NULL;
590         }
591
592         result->child = ctdb_fork(ctdb_db->ctdb);
593
594         if (result->child == (pid_t)-1) {
595                 close(result->fd[0]);
596                 close(result->fd[1]);
597                 talloc_free(result);
598                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
599                 return NULL;
600         }
601
602         result->callback = callback;
603         result->private_data = state;
604         result->ctdb = ctdb_db->ctdb;
605         result->ctdb_db = ctdb_db;
606
607         if (result->child == 0) {
608                 char c = 0;
609
610                 close(result->fd[0]);
611                 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
612                 ret = ctdb_persistent_store(state);
613                 if (ret != 0) {
614                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
615                         c = 1;
616                 }
617
618                 write(result->fd[1], &c, 1);
619
620                 /* make sure we die when our parent dies */
621                 while (kill(parent, 0) == 0 || errno != ESRCH) {
622                         sleep(5);
623                 }
624                 _exit(0);
625         }
626
627         close(result->fd[1]);
628         set_close_on_exec(result->fd[0]);
629
630         talloc_set_destructor(result, childwrite_destructor);
631
632         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
633
634         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
635                                    EVENT_FD_READ, childwrite_handler,
636                                    (void *)result);
637         if (result->fde == NULL) {
638                 talloc_free(result);
639                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
640                 return NULL;
641         }
642         tevent_fd_set_auto_close(result->fde);
643
644         result->start_time = timeval_current();
645
646         return result;
647 }
648
649 /* 
650    update a record on this node if the new record has a higher rsn than the
651    current record
652  */
653 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
654                                    struct ctdb_req_control *c, TDB_DATA recdata, 
655                                    bool *async_reply)
656 {
657         struct ctdb_db_context *ctdb_db;
658         struct ctdb_persistent_write_state *state;
659         struct childwrite_handle *handle;
660         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
661
662         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
663                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
664                 return -1;
665         }
666
667         ctdb_db = find_ctdb_db(ctdb, m->db_id);
668         if (ctdb_db == NULL) {
669                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
670                 return -1;
671         }
672
673         if (ctdb_db->unhealthy_reason) {
674                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
675                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
676                 return -1;
677         }
678
679         state = talloc(ctdb, struct ctdb_persistent_write_state);
680         CTDB_NO_MEMORY(ctdb, state);
681
682         state->ctdb_db = ctdb_db;
683         state->c       = c;
684         state->m       = m;
685
686         /* create a child process to take out a transaction and 
687            write the data.
688         */
689         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
690         if (handle == NULL) {
691                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
692                 talloc_free(state);
693                 return -1;
694         }
695
696         /* we need to wait for the replies */
697         *async_reply = true;
698
699         /* need to keep the control structure around */
700         talloc_steal(state, c);
701
702         /* but we won't wait forever */
703         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
704                         ctdb_persistent_lock_timeout, state);
705
706         return 0;
707 }
708
709
710 /*
711   called when a client has finished a local commit in a transaction to 
712   a persistent database
713  */
714 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
715                                      struct ctdb_req_control *c)
716 {
717         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
718         struct ctdb_db_context *ctdb_db;
719
720         ctdb_db = find_ctdb_db(ctdb, client->db_id);
721         if (ctdb_db == NULL) {
722                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
723                                  "Unknown database 0x%08x\n", client->db_id));
724                 return -1;
725         }
726         if (!ctdb_db->transaction_active) {
727                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
728                                  "Database 0x%08x has no transaction commit "
729                                  "started\n", client->db_id));
730                 return -1;
731         }
732
733         ctdb_db->transaction_active = false;
734         client->db_id = 0;
735
736         if (client->num_persistent_updates == 0) {
737                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
738                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
739                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
740                 return -1;
741         }
742         client->num_persistent_updates--;
743
744         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
745                             "transaction commit db_id[0x%08x]\n",
746                             client->client_id, ctdb_db->db_id));
747
748         return 0;
749 }
750
751 /*
752   called when a client gets an error committing its database
753   during a transaction commit
754  */
755 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
756                                   struct ctdb_req_control *c)
757 {
758         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
759         struct ctdb_db_context *ctdb_db;
760
761         ctdb_db = find_ctdb_db(ctdb, client->db_id);
762         if (ctdb_db == NULL) {
763                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
764                                  "Unknown database 0x%08x\n", client->db_id));
765                 return -1;
766         }
767         if (!ctdb_db->transaction_active) {
768                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
769                                  "Database 0x%08x has no transaction commit "
770                                  "started\n", client->db_id));
771                 return -1;
772         }
773
774         ctdb_db->transaction_active = false;
775         client->db_id = 0;
776
777         if (client->num_persistent_updates == 0) {
778                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
779         } else {
780                 client->num_persistent_updates--;
781         }
782
783         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
784                          " db_id[0x%08x] - forcing recovery\n",
785                          ctdb_db->db_id));
786         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
787
788         return 0;
789 }
790
791 /**
792  * Tell whether a transaction is active on this node on the give DB.
793  */
794 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
795                                    struct ctdb_req_control *c,
796                                    uint32_t db_id)
797 {
798         struct ctdb_db_context *ctdb_db;
799         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
800
801         ctdb_db = find_ctdb_db(ctdb, db_id);
802         if (!ctdb_db) {
803                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
804                 return -1;
805         }
806
807         if (client->db_id == db_id) {
808                 return 0;
809         }
810
811         if (ctdb_db->transaction_active) {
812                 return 1;
813         } else {
814                 return 0;
815         }
816 }
817
818 /*
819   backwards compatibility:
820
821   start a persistent store operation. passing both the key, header and
822   data to the daemon. If the client disconnects before it has issued
823   a persistent_update call to the daemon we trigger a full recovery
824   to ensure the databases are brought back in sync.
825   for now we ignore the recdata that the client has passed to us.
826  */
827 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
828                                       struct ctdb_req_control *c,
829                                       TDB_DATA recdata)
830 {
831         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
832
833         if (client == NULL) {
834                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
835                 return -1;
836         }
837
838         client->num_persistent_updates++;
839
840         return 0;
841 }
842
843 /* 
844   backwards compatibility:
845
846   called to tell ctdbd that it is no longer doing a persistent update 
847 */
848 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
849                                               struct ctdb_req_control *c,
850                                               TDB_DATA recdata)
851 {
852         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
853
854         if (client == NULL) {
855                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
856                 return -1;
857         }
858
859         if (client->num_persistent_updates > 0) {
860                 client->num_persistent_updates--;
861         }
862
863         return 0;
864 }
865
866
867 /*
868   backwards compatibility:
869
870   single record varient of ctdb_control_trans2_commit for older clients
871  */
872 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
873                                       struct ctdb_req_control *c, 
874                                       TDB_DATA recdata, bool *async_reply)
875 {
876         struct ctdb_marshall_buffer *m;
877         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
878         TDB_DATA key, data;
879
880         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
881             rec->keylen + rec->datalen) {
882                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
883                 return -1;
884         }
885
886         key.dptr = &rec->data[0];
887         key.dsize = rec->keylen;
888         data.dptr = &rec->data[rec->keylen];
889         data.dsize = rec->datalen;
890
891         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
892         CTDB_NO_MEMORY(ctdb, m);
893
894         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
895 }
896
897 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
898                                   uint32_t db_id,
899                                   uint64_t *seqnum)
900 {
901         int32_t ret;
902         struct ctdb_db_context *ctdb_db;
903         const char *keyname = CTDB_DB_SEQNUM_KEY;
904         TDB_DATA key;
905         TDB_DATA data;
906         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
907
908         ctdb_db = find_ctdb_db(ctdb, db_id);
909         if (!ctdb_db) {
910                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
911                 ret = -1;
912                 goto done;
913         }
914
915         key.dptr = (uint8_t *)discard_const(keyname);
916         key.dsize = strlen(keyname) + 1;
917
918         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
919         if (ret != 0) {
920                 goto done;
921         }
922
923         if (data.dsize != sizeof(uint64_t)) {
924                 *seqnum = 0;
925                 goto done;
926         }
927
928         *seqnum = *(uint64_t *)data.dptr;
929
930 done:
931         talloc_free(mem_ctx);
932         return ret;
933 }
934
935 /**
936  * Get the sequence number of a persistent database.
937  */
938 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
939                                    TDB_DATA indata,
940                                    TDB_DATA *outdata)
941 {
942         uint32_t db_id;
943         int32_t ret;
944         uint64_t seqnum;
945
946         db_id = *(uint32_t *)indata.dptr;
947         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
948         if (ret != 0) {
949                 goto done;
950         }
951
952         outdata->dsize = sizeof(uint64_t);
953         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
954         if (outdata->dptr == NULL) {
955                 ret = -1;
956                 goto done;
957         }
958
959         *(outdata->dptr) = seqnum;
960
961 done:
962         return ret;
963 }