d38aa8d9e775539522ba610cfc8c4eadd1ca5863
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35         uint32_t num_failed, num_sent;
36 };
37
38 /*
39   1) all nodes fail, and all nodes reply
40   2) some nodes fail, all nodes reply
41   3) some nodes timeout
42   4) all nodes succeed
43  */
44
45 /*
46   called when a node has acknowledged a ctdb_control_update_record call
47  */
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49                                      int32_t status, TDB_DATA data, 
50                                      const char *errormsg,
51                                      void *private_data)
52 {
53         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
54                                                               struct ctdb_persistent_state);
55
56         if (status != 0) {
57                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
58                          status, errormsg));
59                 state->status = status;
60                 state->errormsg = errormsg;
61                 state->num_failed++;
62         }
63         state->num_pending--;
64         if (state->num_pending == 0) {
65                 enum ctdb_trans2_commit_error etype;
66                 if (state->num_failed == state->num_sent) {
67                         etype = CTDB_TRANS2_COMMIT_ALLFAIL;
68                 } else if (state->num_failed != 0) {
69                         etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
70                 } else {
71                         etype = CTDB_TRANS2_COMMIT_SUCCESS;
72                 }
73                 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
74                 talloc_free(state);
75         }
76 }
77
78 /*
79   called if persistent store times out
80  */
81 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
82                                          struct timeval t, void *private_data)
83 {
84         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
85         
86         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
87                                    "timeout in ctdb_persistent_state");
88
89         talloc_free(state);
90 }
91
92 /*
93   store a set of persistent records - called from a ctdb client when it has updated
94   some records in a persistent database. The client will have the record
95   locked for the duration of this call. The client is the dmaster when 
96   this call is made
97  */
98 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
99                                    struct ctdb_req_control *c, 
100                                    TDB_DATA recdata, bool *async_reply)
101 {
102         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
103         struct ctdb_persistent_state *state;
104         int i;
105         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
106         struct ctdb_db_context *ctdb_db;
107
108         ctdb_db = find_ctdb_db(ctdb, m->db_id);
109         if (ctdb_db == NULL) {
110                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
111                                  "Unknown database db_id[0x%08x]\n", m->db_id));
112                 return -1;
113         }
114
115         if (client == NULL) {
116                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
117                 return -1;
118         }
119
120         if (ctdb_db->unhealthy_reason) {
121                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
122                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
123                 return -1;
124         }
125
126         /* handling num_persistent_updates is a bit strange - 
127            there are 3 cases
128              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
129                 They don't expect num_persistent_updates to be used at all
130
131              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
132                 this commit to then decrement it
133
134              3) new clients which use TRANS2 commit functions, and
135                 expect this function to increment the counter, and
136                 then have it decremented in ctdb_control_trans2_error
137                 or ctdb_control_trans2_finished
138         */
139         switch (c->opcode) {
140         case CTDB_CONTROL_PERSISTENT_STORE:
141                 if (ctdb_db->transaction_active) {
142                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
143                                           "transaction is active on database "
144                                           "db_id[0x%08x] - refusing persistent "
145                                          " store for client id[0x%08x]\n",
146                                           ctdb_db->db_id, client->client_id));
147                         return -1;
148                 }
149                 if (client->num_persistent_updates > 0) {
150                         client->num_persistent_updates--;
151                 }
152                 break;
153         case CTDB_CONTROL_TRANS2_COMMIT:
154                 if (ctdb_db->transaction_active) {
155                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
156                                          " already a transaction commit "
157                                          "active on db_id[0x%08x] - forbidding "
158                                          "client_id[0x%08x] to commit\n",
159                                          ctdb_db->db_id, client->client_id));
160                         return -1;
161                 }
162                 if (client->db_id != 0) {
163                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
164                                          "client-db_id[0x%08x] != 0 "
165                                          "(client_id[0x%08x])\n",
166                                          client->db_id, client->client_id));
167                         return -1;
168                 }
169                 client->num_persistent_updates++;
170                 ctdb_db->transaction_active = true;
171                 client->db_id = m->db_id;
172                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
173                                   " commit transaction on db id[0x%08x]\n",
174                                   client->client_id, client->db_id));
175                 break;
176         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
177                 /* already updated from the first commit */
178                 if (client->db_id != m->db_id) {
179                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
180                                          "retry: client-db_id[0x%08x] != "
181                                          "db_id[0x%08x] (client_id[0x%08x])\n",
182                                          client->db_id,
183                                          m->db_id, client->client_id));
184                         return -1;
185                 }
186                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
187                                     "transaction commit retry on "
188                                     "db_id[0x%08x]\n",
189                                     client->client_id, client->db_id));
190                 break;
191         }
192
193         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
194                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
195                 return -1;
196         }
197
198         state = talloc_zero(ctdb, struct ctdb_persistent_state);
199         CTDB_NO_MEMORY(ctdb, state);
200
201         state->ctdb = ctdb;
202         state->c    = c;
203
204         for (i=0;i<ctdb->vnn_map->size;i++) {
205                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
206                 int ret;
207
208                 /* only send to active nodes */
209                 if (node->flags & NODE_FLAGS_INACTIVE) {
210                         continue;
211                 }
212
213                 /* don't send to ourselves */
214                 if (node->pnn == ctdb->pnn) {
215                         continue;
216                 }
217                 
218                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
219                                                c->client_id, 0, recdata, 
220                                                ctdb_persistent_callback, state);
221                 if (ret == -1) {
222                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
223                         talloc_free(state);
224                         return -1;
225                 }
226
227                 state->num_pending++;
228                 state->num_sent++;
229         }
230
231         if (state->num_pending == 0) {
232                 talloc_free(state);
233                 return 0;
234         }
235         
236         /* we need to wait for the replies */
237         *async_reply = true;
238
239         /* need to keep the control structure around */
240         talloc_steal(state, c);
241
242         /* but we won't wait forever */
243         event_add_timed(ctdb->ev, state, 
244                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
245                         ctdb_persistent_store_timeout, state);
246
247         return 0;
248 }
249
250
251 /*
252  * Store a set of persistent records.
253  * This is used to roll out a transaction to all nodes.
254  */
255 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
256                                    struct ctdb_req_control *c,
257                                    TDB_DATA recdata, bool *async_reply)
258 {
259         struct ctdb_client *client;
260         struct ctdb_persistent_state *state;
261         int i;
262         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
263         struct ctdb_db_context *ctdb_db;
264
265         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
266                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
267                 return -1;
268         }
269
270         ctdb_db = find_ctdb_db(ctdb, m->db_id);
271         if (ctdb_db == NULL) {
272                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
273                                  "Unknown database db_id[0x%08x]\n", m->db_id));
274                 return -1;
275         }
276
277         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
278         if (client == NULL) {
279                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
280                                  "to a client. Returning error\n"));
281                 return -1;
282         }
283
284         state = talloc_zero(ctdb, struct ctdb_persistent_state);
285         CTDB_NO_MEMORY(ctdb, state);
286
287         state->ctdb = ctdb;
288         state->c    = c;
289
290         for (i = 0; i < ctdb->vnn_map->size; i++) {
291                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
292                 int ret;
293
294                 /* only send to active nodes */
295                 if (node->flags & NODE_FLAGS_INACTIVE) {
296                         continue;
297                 }
298
299                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
300                                                CTDB_CONTROL_UPDATE_RECORD,
301                                                c->client_id, 0, recdata,
302                                                ctdb_persistent_callback,
303                                                state);
304                 if (ret == -1) {
305                         DEBUG(DEBUG_ERR,("Unable to send "
306                                          "CTDB_CONTROL_UPDATE_RECORD "
307                                          "to pnn %u\n", node->pnn));
308                         talloc_free(state);
309                         return -1;
310                 }
311
312                 state->num_pending++;
313                 state->num_sent++;
314         }
315
316         if (state->num_pending == 0) {
317                 talloc_free(state);
318                 return 0;
319         }
320
321         /* we need to wait for the replies */
322         *async_reply = true;
323
324         /* need to keep the control structure around */
325         talloc_steal(state, c);
326
327         /* but we won't wait forever */
328         event_add_timed(ctdb->ev, state,
329                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
330                         ctdb_persistent_store_timeout, state);
331
332         return 0;
333 }
334
335
336 struct ctdb_persistent_write_state {
337         struct ctdb_db_context *ctdb_db;
338         struct ctdb_marshall_buffer *m;
339         struct ctdb_req_control *c;
340 };
341
342
343 /*
344   called from a child process to write the data
345  */
346 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
347 {
348         int ret, i;
349         struct ctdb_rec_data *rec = NULL;
350         struct ctdb_marshall_buffer *m = state->m;
351
352         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
353         if (ret == -1) {
354                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
355                                  state->ctdb_db->db_id));
356                 return -1;
357         }
358
359         for (i=0;i<m->count;i++) {
360                 struct ctdb_ltdb_header oldheader;
361                 struct ctdb_ltdb_header header;
362                 TDB_DATA key, data, olddata;
363                 TALLOC_CTX *tmp_ctx = talloc_new(state);
364
365                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
366                 
367                 if (rec == NULL) {
368                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
369                                          i, state->ctdb_db->db_id));
370                         talloc_free(tmp_ctx);
371                         goto failed;                    
372                 }
373
374                 /* fetch the old header and ensure the rsn is less than the new rsn */
375                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
376                 if (ret != 0) {
377                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
378                                          state->ctdb_db->db_id));
379                         talloc_free(tmp_ctx);
380                         goto failed;
381                 }
382
383                 if (oldheader.rsn >= header.rsn &&
384                     (olddata.dsize != data.dsize || 
385                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
386                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
387                                           state->ctdb_db->db_id, 
388                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
389                         talloc_free(tmp_ctx);
390                         goto failed;
391                 }
392
393                 talloc_free(tmp_ctx);
394
395                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
396                 if (ret != 0) {
397                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
398                                           state->ctdb_db->db_id));
399                         goto failed;
400                 }
401         }
402
403         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
404         if (ret == -1) {
405                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
406                                  state->ctdb_db->db_id));
407                 return -1;
408         }
409
410         return 0;
411         
412 failed:
413         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
414         return -1;
415 }
416
417
418 /*
419   called when we the child has completed the persistent write
420   on our behalf
421  */
422 static void ctdb_persistent_write_callback(int status, void *private_data)
423 {
424         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
425                                                                    struct ctdb_persistent_write_state);
426
427
428         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
429
430         talloc_free(state);
431 }
432
433 /*
434   called if our lockwait child times out
435  */
436 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
437                                          struct timeval t, void *private_data)
438 {
439         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
440                                                                    struct ctdb_persistent_write_state);
441         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
442         talloc_free(state);
443 }
444
445 struct childwrite_handle {
446         struct ctdb_context *ctdb;
447         struct ctdb_db_context *ctdb_db;
448         struct fd_event *fde;
449         int fd[2];
450         pid_t child;
451         void *private_data;
452         void (*callback)(int, void *);
453         struct timeval start_time;
454 };
455
456 static int childwrite_destructor(struct childwrite_handle *h)
457 {
458         h->ctdb->statistics.pending_childwrite_calls--;
459         kill(h->child, SIGKILL);
460         return 0;
461 }
462
463 /* called when the child process has finished writing the record to the
464    database
465 */
466 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
467                              uint16_t flags, void *private_data)
468 {
469         struct childwrite_handle *h = talloc_get_type(private_data, 
470                                                      struct childwrite_handle);
471         void *p = h->private_data;
472         void (*callback)(int, void *) = h->callback;
473         pid_t child = h->child;
474         TALLOC_CTX *tmp_ctx = talloc_new(ev);
475         int ret;
476         char c;
477
478         ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
479         h->ctdb->statistics.pending_childwrite_calls--;
480
481         /* the handle needs to go away when the context is gone - when
482            the handle goes away this implicitly closes the pipe, which
483            kills the child */
484         talloc_steal(tmp_ctx, h);
485
486         talloc_set_destructor(h, NULL);
487
488         ret = read(h->fd[0], &c, 1);
489         if (ret < 1) {
490                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
491                 c = 1;
492         }
493
494         callback(c, p);
495
496         kill(child, SIGKILL);
497         talloc_free(tmp_ctx);
498 }
499
500 /* this creates a child process which will take out a tdb transaction
501    and write the record to the database.
502 */
503 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
504                                 void (*callback)(int, void *private_data),
505                                 struct ctdb_persistent_write_state *state)
506 {
507         struct childwrite_handle *result;
508         int ret;
509         pid_t parent = getpid();
510
511         ctdb_db->ctdb->statistics.childwrite_calls++;
512         ctdb_db->ctdb->statistics.pending_childwrite_calls++;
513
514         if (!(result = talloc_zero(state, struct childwrite_handle))) {
515                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
516                 return NULL;
517         }
518
519         ret = pipe(result->fd);
520
521         if (ret != 0) {
522                 talloc_free(result);
523                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
524                 return NULL;
525         }
526
527         result->child = fork();
528
529         if (result->child == (pid_t)-1) {
530                 close(result->fd[0]);
531                 close(result->fd[1]);
532                 talloc_free(result);
533                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
534                 return NULL;
535         }
536
537         result->callback = callback;
538         result->private_data = state;
539         result->ctdb = ctdb_db->ctdb;
540         result->ctdb_db = ctdb_db;
541
542         if (result->child == 0) {
543                 char c = 0;
544
545                 close(result->fd[0]);
546                 ret = ctdb_persistent_store(state);
547                 if (ret != 0) {
548                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
549                         c = 1;
550                 }
551
552                 write(result->fd[1], &c, 1);
553
554                 /* make sure we die when our parent dies */
555                 while (kill(parent, 0) == 0 || errno != ESRCH) {
556                         sleep(5);
557                 }
558                 _exit(0);
559         }
560
561         close(result->fd[1]);
562         set_close_on_exec(result->fd[0]);
563
564         talloc_set_destructor(result, childwrite_destructor);
565
566         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
567
568         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
569                                    EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
570                                    (void *)result);
571         if (result->fde == NULL) {
572                 talloc_free(result);
573                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
574                 return NULL;
575         }
576
577         result->start_time = timeval_current();
578
579         return result;
580 }
581
582 /* 
583    update a record on this node if the new record has a higher rsn than the
584    current record
585  */
586 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
587                                    struct ctdb_req_control *c, TDB_DATA recdata, 
588                                    bool *async_reply)
589 {
590         struct ctdb_db_context *ctdb_db;
591         struct ctdb_persistent_write_state *state;
592         struct childwrite_handle *handle;
593         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
594
595         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
596                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
597                 return -1;
598         }
599
600         ctdb_db = find_ctdb_db(ctdb, m->db_id);
601         if (ctdb_db == NULL) {
602                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
603                 return -1;
604         }
605
606         if (ctdb_db->unhealthy_reason) {
607                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
608                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
609                 return -1;
610         }
611
612         state = talloc(ctdb, struct ctdb_persistent_write_state);
613         CTDB_NO_MEMORY(ctdb, state);
614
615         state->ctdb_db = ctdb_db;
616         state->c       = c;
617         state->m       = m;
618
619         /* create a child process to take out a transaction and 
620            write the data.
621         */
622         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
623         if (handle == NULL) {
624                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
625                 talloc_free(state);
626                 return -1;
627         }
628
629         /* we need to wait for the replies */
630         *async_reply = true;
631
632         /* need to keep the control structure around */
633         talloc_steal(state, c);
634
635         /* but we won't wait forever */
636         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
637                         ctdb_persistent_lock_timeout, state);
638
639         return 0;
640 }
641
642
643 /*
644   called when a client has finished a local commit in a transaction to 
645   a persistent database
646  */
647 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
648                                      struct ctdb_req_control *c)
649 {
650         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
651         struct ctdb_db_context *ctdb_db;
652
653         ctdb_db = find_ctdb_db(ctdb, client->db_id);
654         if (ctdb_db == NULL) {
655                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
656                                  "Unknown database 0x%08x\n", client->db_id));
657                 return -1;
658         }
659         if (!ctdb_db->transaction_active) {
660                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
661                                  "Database 0x%08x has no transaction commit "
662                                  "started\n", client->db_id));
663                 return -1;
664         }
665
666         ctdb_db->transaction_active = false;
667         client->db_id = 0;
668
669         if (client->num_persistent_updates == 0) {
670                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
671                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
672                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
673                 return -1;
674         }
675         client->num_persistent_updates--;
676
677         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
678                             "transaction commit db_id[0x%08x]\n",
679                             client->client_id, ctdb_db->db_id));
680
681         return 0;
682 }
683
684 /*
685   called when a client gets an error committing its database
686   during a transaction commit
687  */
688 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
689                                   struct ctdb_req_control *c)
690 {
691         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
692         struct ctdb_db_context *ctdb_db;
693
694         ctdb_db = find_ctdb_db(ctdb, client->db_id);
695         if (ctdb_db == NULL) {
696                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
697                                  "Unknown database 0x%08x\n", client->db_id));
698                 return -1;
699         }
700         if (!ctdb_db->transaction_active) {
701                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
702                                  "Database 0x%08x has no transaction commit "
703                                  "started\n", client->db_id));
704                 return -1;
705         }
706
707         ctdb_db->transaction_active = false;
708         client->db_id = 0;
709
710         if (client->num_persistent_updates == 0) {
711                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
712         } else {
713                 client->num_persistent_updates--;
714         }
715
716         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
717                          " db_id[0x%08x] - forcing recovery\n",
718                          ctdb_db->db_id));
719         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
720
721         return 0;
722 }
723
724 /**
725  * Tell whether a transaction is active on this node on the give DB.
726  */
727 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
728                                    struct ctdb_req_control *c,
729                                    uint32_t db_id)
730 {
731         struct ctdb_db_context *ctdb_db;
732         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
733
734         ctdb_db = find_ctdb_db(ctdb, db_id);
735         if (!ctdb_db) {
736                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
737                 return -1;
738         }
739
740         if (client->db_id == db_id) {
741                 return 0;
742         }
743
744         if (ctdb_db->transaction_active) {
745                 return 1;
746         } else {
747                 return 0;
748         }
749 }
750
751 /*
752   backwards compatibility:
753
754   start a persistent store operation. passing both the key, header and
755   data to the daemon. If the client disconnects before it has issued
756   a persistent_update call to the daemon we trigger a full recovery
757   to ensure the databases are brought back in sync.
758   for now we ignore the recdata that the client has passed to us.
759  */
760 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
761                                       struct ctdb_req_control *c,
762                                       TDB_DATA recdata)
763 {
764         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
765
766         if (client == NULL) {
767                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
768                 return -1;
769         }
770
771         client->num_persistent_updates++;
772
773         return 0;
774 }
775
776 /* 
777   backwards compatibility:
778
779   called to tell ctdbd that it is no longer doing a persistent update 
780 */
781 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
782                                               struct ctdb_req_control *c,
783                                               TDB_DATA recdata)
784 {
785         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
786
787         if (client == NULL) {
788                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
789                 return -1;
790         }
791
792         if (client->num_persistent_updates > 0) {
793                 client->num_persistent_updates--;
794         }
795
796         return 0;
797 }
798
799
800 /*
801   backwards compatibility:
802
803   single record varient of ctdb_control_trans2_commit for older clients
804  */
805 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
806                                       struct ctdb_req_control *c, 
807                                       TDB_DATA recdata, bool *async_reply)
808 {
809         struct ctdb_marshall_buffer *m;
810         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
811         TDB_DATA key, data;
812
813         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
814             rec->keylen + rec->datalen) {
815                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
816                 return -1;
817         }
818
819         key.dptr = &rec->data[0];
820         key.dsize = rec->keylen;
821         data.dptr = &rec->data[rec->keylen];
822         data.dsize = rec->datalen;
823
824         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
825         CTDB_NO_MEMORY(ctdb, m);
826
827         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
828 }
829
830 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
831                                   uint32_t db_id,
832                                   uint64_t *seqnum)
833 {
834         int32_t ret;
835         struct ctdb_db_context *ctdb_db;
836         const char *keyname = CTDB_DB_SEQNUM_KEY;
837         TDB_DATA key;
838         TDB_DATA data;
839         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
840
841         ctdb_db = find_ctdb_db(ctdb, db_id);
842         if (!ctdb_db) {
843                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
844                 ret = -1;
845                 goto done;
846         }
847
848         key.dptr = (uint8_t *)discard_const(keyname);
849         key.dsize = strlen(keyname) + 1;
850
851         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
852         if (ret != 0) {
853                 goto done;
854         }
855
856         if (data.dsize != sizeof(uint64_t)) {
857                 *seqnum = 0;
858                 goto done;
859         }
860
861         *seqnum = *(uint64_t *)data.dptr;
862
863 done:
864         talloc_free(mem_ctx);
865         return ret;
866 }
867
868 /**
869  * Get the sequence number of a persistent database.
870  */
871 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
872                                    TDB_DATA indata,
873                                    TDB_DATA *outdata)
874 {
875         uint32_t db_id;
876         int32_t ret;
877         uint64_t seqnum;
878
879         db_id = *(uint32_t *)indata.dptr;
880         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
881         if (ret != 0) {
882                 goto done;
883         }
884
885         outdata->dsize = sizeof(uint64_t);
886         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
887         if (outdata->dptr == NULL) {
888                 ret = -1;
889                 goto done;
890         }
891
892         *(outdata->dptr) = seqnum;
893
894 done:
895         return ret;
896 }