f9a20510c989f40b1a2c432f827e97ae5443a331
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35         uint32_t num_failed, num_sent;
36 };
37
38 /*
39   1) all nodes fail, and all nodes reply
40   2) some nodes fail, all nodes reply
41   3) some nodes timeout
42   4) all nodes succeed
43  */
44
45 /*
46   called when a node has acknowledged a ctdb_control_update_record call
47  */
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49                                      int32_t status, TDB_DATA data, 
50                                      const char *errormsg,
51                                      void *private_data)
52 {
53         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
54                                                               struct ctdb_persistent_state);
55
56         if (status != 0) {
57                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
58                          status, errormsg));
59                 state->status = status;
60                 state->errormsg = errormsg;
61                 state->num_failed++;
62         }
63         state->num_pending--;
64         if (state->num_pending == 0) {
65                 enum ctdb_trans2_commit_error etype;
66                 if (state->num_failed == state->num_sent) {
67                         etype = CTDB_TRANS2_COMMIT_ALLFAIL;
68                 } else if (state->num_failed != 0) {
69                         etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
70                 } else {
71                         etype = CTDB_TRANS2_COMMIT_SUCCESS;
72                 }
73                 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
74                 talloc_free(state);
75         }
76 }
77
78 /*
79   called if persistent store times out
80  */
81 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
82                                          struct timeval t, void *private_data)
83 {
84         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
85         
86         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
87                                    "timeout in ctdb_persistent_state");
88
89         talloc_free(state);
90 }
91
92 /*
93   store a set of persistent records - called from a ctdb client when it has updated
94   some records in a persistent database. The client will have the record
95   locked for the duration of this call. The client is the dmaster when 
96   this call is made
97  */
98 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
99                                    struct ctdb_req_control *c, 
100                                    TDB_DATA recdata, bool *async_reply)
101 {
102         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
103         struct ctdb_persistent_state *state;
104         int i;
105         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
106         struct ctdb_db_context *ctdb_db;
107
108         ctdb_db = find_ctdb_db(ctdb, m->db_id);
109         if (ctdb_db == NULL) {
110                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
111                                  "Unknown database db_id[0x%08x]\n", m->db_id));
112                 return -1;
113         }
114
115         if (client == NULL) {
116                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
117                 return -1;
118         }
119
120         if (ctdb_db->unhealthy_reason) {
121                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
122                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
123                 return -1;
124         }
125
126         /* handling num_persistent_updates is a bit strange - 
127            there are 3 cases
128              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
129                 They don't expect num_persistent_updates to be used at all
130
131              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
132                 this commit to then decrement it
133
134              3) new clients which use TRANS2 commit functions, and
135                 expect this function to increment the counter, and
136                 then have it decremented in ctdb_control_trans2_error
137                 or ctdb_control_trans2_finished
138         */
139         switch (c->opcode) {
140         case CTDB_CONTROL_PERSISTENT_STORE:
141                 if (ctdb_db->transaction_active) {
142                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
143                                           "transaction is active on database "
144                                           "db_id[0x%08x] - refusing persistent "
145                                          " store for client id[0x%08x]\n",
146                                           ctdb_db->db_id, client->client_id));
147                         return -1;
148                 }
149                 if (client->num_persistent_updates > 0) {
150                         client->num_persistent_updates--;
151                 }
152                 break;
153         case CTDB_CONTROL_TRANS2_COMMIT:
154                 if (ctdb_db->transaction_active) {
155                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
156                                          " already a transaction commit "
157                                          "active on db_id[0x%08x] - forbidding "
158                                          "client_id[0x%08x] to commit\n",
159                                          ctdb_db->db_id, client->client_id));
160                         return -1;
161                 }
162                 if (client->db_id != 0) {
163                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
164                                          "client-db_id[0x%08x] != 0 "
165                                          "(client_id[0x%08x])\n",
166                                          client->db_id, client->client_id));
167                         return -1;
168                 }
169                 client->num_persistent_updates++;
170                 ctdb_db->transaction_active = true;
171                 client->db_id = m->db_id;
172                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
173                                   " commit transaction on db id[0x%08x]\n",
174                                   client->client_id, client->db_id));
175                 break;
176         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
177                 /* already updated from the first commit */
178                 if (client->db_id != m->db_id) {
179                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
180                                          "retry: client-db_id[0x%08x] != "
181                                          "db_id[0x%08x] (client_id[0x%08x])\n",
182                                          client->db_id,
183                                          m->db_id, client->client_id));
184                         return -1;
185                 }
186                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
187                                     "transaction commit retry on "
188                                     "db_id[0x%08x]\n",
189                                     client->client_id, client->db_id));
190                 break;
191         }
192
193         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
194                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
195                 return -1;
196         }
197
198         state = talloc_zero(ctdb, struct ctdb_persistent_state);
199         CTDB_NO_MEMORY(ctdb, state);
200
201         state->ctdb = ctdb;
202         state->c    = c;
203
204         for (i=0;i<ctdb->vnn_map->size;i++) {
205                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
206                 int ret;
207
208                 /* only send to active nodes */
209                 if (node->flags & NODE_FLAGS_INACTIVE) {
210                         continue;
211                 }
212
213                 /* don't send to ourselves */
214                 if (node->pnn == ctdb->pnn) {
215                         continue;
216                 }
217                 
218                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
219                                                c->client_id, 0, recdata, 
220                                                ctdb_persistent_callback, state);
221                 if (ret == -1) {
222                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
223                         talloc_free(state);
224                         return -1;
225                 }
226
227                 state->num_pending++;
228                 state->num_sent++;
229         }
230
231         if (state->num_pending == 0) {
232                 talloc_free(state);
233                 return 0;
234         }
235         
236         /* we need to wait for the replies */
237         *async_reply = true;
238
239         /* need to keep the control structure around */
240         talloc_steal(state, c);
241
242         /* but we won't wait forever */
243         event_add_timed(ctdb->ev, state, 
244                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
245                         ctdb_persistent_store_timeout, state);
246
247         return 0;
248 }
249
250
251 /*
252  * Store a set of persistent records.
253  * This is used to roll out a transaction to all nodes.
254  */
255 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
256                                    struct ctdb_req_control *c,
257                                    TDB_DATA recdata, bool *async_reply)
258 {
259         struct ctdb_client *client;
260         struct ctdb_persistent_state *state;
261         int i;
262         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
263         struct ctdb_db_context *ctdb_db;
264
265         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
266                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
267                 return -1;
268         }
269
270         ctdb_db = find_ctdb_db(ctdb, m->db_id);
271         if (ctdb_db == NULL) {
272                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
273                                  "Unknown database db_id[0x%08x]\n", m->db_id));
274                 return -1;
275         }
276
277         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
278         if (client == NULL) {
279                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
280                                  "to a client. Returning error\n"));
281                 return -1;
282         }
283
284         state = talloc_zero(ctdb, struct ctdb_persistent_state);
285         CTDB_NO_MEMORY(ctdb, state);
286
287         state->ctdb = ctdb;
288         state->c    = c;
289
290         for (i = 0; i < ctdb->vnn_map->size; i++) {
291                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
292                 int ret;
293
294                 /* only send to active nodes */
295                 if (node->flags & NODE_FLAGS_INACTIVE) {
296                         continue;
297                 }
298
299                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
300                                                CTDB_CONTROL_UPDATE_RECORD,
301                                                c->client_id, 0, recdata,
302                                                ctdb_persistent_callback,
303                                                state);
304                 if (ret == -1) {
305                         DEBUG(DEBUG_ERR,("Unable to send "
306                                          "CTDB_CONTROL_UPDATE_RECORD "
307                                          "to pnn %u\n", node->pnn));
308                         talloc_free(state);
309                         return -1;
310                 }
311
312                 state->num_pending++;
313                 state->num_sent++;
314         }
315
316         if (state->num_pending == 0) {
317                 talloc_free(state);
318                 return 0;
319         }
320
321         /* we need to wait for the replies */
322         *async_reply = true;
323
324         /* need to keep the control structure around */
325         talloc_steal(state, c);
326
327         /* but we won't wait forever */
328         event_add_timed(ctdb->ev, state,
329                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
330                         ctdb_persistent_store_timeout, state);
331
332         return 0;
333 }
334
335
336 struct ctdb_persistent_write_state {
337         struct ctdb_db_context *ctdb_db;
338         struct ctdb_marshall_buffer *m;
339         struct ctdb_req_control *c;
340 };
341
342
343 /*
344   called from a child process to write the data
345  */
346 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
347 {
348         int ret, i;
349         struct ctdb_rec_data *rec = NULL;
350         struct ctdb_marshall_buffer *m = state->m;
351
352         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
353         if (ret == -1) {
354                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
355                                  state->ctdb_db->db_id));
356                 return -1;
357         }
358
359         for (i=0;i<m->count;i++) {
360                 struct ctdb_ltdb_header oldheader;
361                 struct ctdb_ltdb_header header;
362                 TDB_DATA key, data, olddata;
363                 TALLOC_CTX *tmp_ctx = talloc_new(state);
364
365                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
366                 
367                 if (rec == NULL) {
368                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
369                                          i, state->ctdb_db->db_id));
370                         talloc_free(tmp_ctx);
371                         goto failed;                    
372                 }
373
374                 /* fetch the old header and ensure the rsn is less than the new rsn */
375                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
376                 if (ret != 0) {
377                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
378                                          state->ctdb_db->db_id));
379                         talloc_free(tmp_ctx);
380                         goto failed;
381                 }
382
383                 if (oldheader.rsn >= header.rsn &&
384                     (olddata.dsize != data.dsize || 
385                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
386                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
387                                           state->ctdb_db->db_id, 
388                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
389                         talloc_free(tmp_ctx);
390                         goto failed;
391                 }
392
393                 talloc_free(tmp_ctx);
394
395                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
396                 if (ret != 0) {
397                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
398                                           state->ctdb_db->db_id));
399                         goto failed;
400                 }
401         }
402
403         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
404         if (ret == -1) {
405                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
406                                  state->ctdb_db->db_id));
407                 return -1;
408         }
409
410         return 0;
411         
412 failed:
413         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
414         return -1;
415 }
416
417
418 /*
419   called when we the child has completed the persistent write
420   on our behalf
421  */
422 static void ctdb_persistent_write_callback(int status, void *private_data)
423 {
424         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
425                                                                    struct ctdb_persistent_write_state);
426
427
428         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
429
430         talloc_free(state);
431 }
432
433 /*
434   called if our lockwait child times out
435  */
436 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
437                                          struct timeval t, void *private_data)
438 {
439         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
440                                                                    struct ctdb_persistent_write_state);
441         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
442         talloc_free(state);
443 }
444
445 struct childwrite_handle {
446         struct ctdb_context *ctdb;
447         struct ctdb_db_context *ctdb_db;
448         struct fd_event *fde;
449         int fd[2];
450         pid_t child;
451         void *private_data;
452         void (*callback)(int, void *);
453         struct timeval start_time;
454 };
455
456 static int childwrite_destructor(struct childwrite_handle *h)
457 {
458         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
459         kill(h->child, SIGKILL);
460         return 0;
461 }
462
463 /* called when the child process has finished writing the record to the
464    database
465 */
466 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
467                              uint16_t flags, void *private_data)
468 {
469         struct childwrite_handle *h = talloc_get_type(private_data, 
470                                                      struct childwrite_handle);
471         void *p = h->private_data;
472         void (*callback)(int, void *) = h->callback;
473         pid_t child = h->child;
474         TALLOC_CTX *tmp_ctx = talloc_new(ev);
475         int ret;
476         char c;
477
478         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
479         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
480
481         /* the handle needs to go away when the context is gone - when
482            the handle goes away this implicitly closes the pipe, which
483            kills the child */
484         talloc_steal(tmp_ctx, h);
485
486         talloc_set_destructor(h, NULL);
487
488         ret = read(h->fd[0], &c, 1);
489         if (ret < 1) {
490                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
491                 c = 1;
492         }
493
494         callback(c, p);
495
496         kill(child, SIGKILL);
497         talloc_free(tmp_ctx);
498 }
499
500 /* this creates a child process which will take out a tdb transaction
501    and write the record to the database.
502 */
503 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
504                                 void (*callback)(int, void *private_data),
505                                 struct ctdb_persistent_write_state *state)
506 {
507         struct childwrite_handle *result;
508         int ret;
509         pid_t parent = getpid();
510
511         CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
512         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
513
514         if (!(result = talloc_zero(state, struct childwrite_handle))) {
515                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
516                 return NULL;
517         }
518
519         ret = pipe(result->fd);
520
521         if (ret != 0) {
522                 talloc_free(result);
523                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
524                 return NULL;
525         }
526
527         result->child = ctdb_fork(ctdb_db->ctdb);
528
529         if (result->child == (pid_t)-1) {
530                 close(result->fd[0]);
531                 close(result->fd[1]);
532                 talloc_free(result);
533                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
534                 return NULL;
535         }
536
537         result->callback = callback;
538         result->private_data = state;
539         result->ctdb = ctdb_db->ctdb;
540         result->ctdb_db = ctdb_db;
541
542         if (result->child == 0) {
543                 char c = 0;
544
545                 close(result->fd[0]);
546                 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
547                 ret = ctdb_persistent_store(state);
548                 if (ret != 0) {
549                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
550                         c = 1;
551                 }
552
553                 write(result->fd[1], &c, 1);
554
555                 /* make sure we die when our parent dies */
556                 while (kill(parent, 0) == 0 || errno != ESRCH) {
557                         sleep(5);
558                 }
559                 _exit(0);
560         }
561
562         close(result->fd[1]);
563         set_close_on_exec(result->fd[0]);
564
565         talloc_set_destructor(result, childwrite_destructor);
566
567         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
568
569         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
570                                    EVENT_FD_READ, childwrite_handler,
571                                    (void *)result);
572         if (result->fde == NULL) {
573                 talloc_free(result);
574                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
575                 return NULL;
576         }
577         tevent_fd_set_auto_close(result->fde);
578
579         result->start_time = timeval_current();
580
581         return result;
582 }
583
584 /* 
585    update a record on this node if the new record has a higher rsn than the
586    current record
587  */
588 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
589                                    struct ctdb_req_control *c, TDB_DATA recdata, 
590                                    bool *async_reply)
591 {
592         struct ctdb_db_context *ctdb_db;
593         struct ctdb_persistent_write_state *state;
594         struct childwrite_handle *handle;
595         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
596
597         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
598                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
599                 return -1;
600         }
601
602         ctdb_db = find_ctdb_db(ctdb, m->db_id);
603         if (ctdb_db == NULL) {
604                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
605                 return -1;
606         }
607
608         if (ctdb_db->unhealthy_reason) {
609                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
610                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
611                 return -1;
612         }
613
614         state = talloc(ctdb, struct ctdb_persistent_write_state);
615         CTDB_NO_MEMORY(ctdb, state);
616
617         state->ctdb_db = ctdb_db;
618         state->c       = c;
619         state->m       = m;
620
621         /* create a child process to take out a transaction and 
622            write the data.
623         */
624         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
625         if (handle == NULL) {
626                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
627                 talloc_free(state);
628                 return -1;
629         }
630
631         /* we need to wait for the replies */
632         *async_reply = true;
633
634         /* need to keep the control structure around */
635         talloc_steal(state, c);
636
637         /* but we won't wait forever */
638         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
639                         ctdb_persistent_lock_timeout, state);
640
641         return 0;
642 }
643
644
645 /*
646   called when a client has finished a local commit in a transaction to 
647   a persistent database
648  */
649 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
650                                      struct ctdb_req_control *c)
651 {
652         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
653         struct ctdb_db_context *ctdb_db;
654
655         ctdb_db = find_ctdb_db(ctdb, client->db_id);
656         if (ctdb_db == NULL) {
657                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
658                                  "Unknown database 0x%08x\n", client->db_id));
659                 return -1;
660         }
661         if (!ctdb_db->transaction_active) {
662                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
663                                  "Database 0x%08x has no transaction commit "
664                                  "started\n", client->db_id));
665                 return -1;
666         }
667
668         ctdb_db->transaction_active = false;
669         client->db_id = 0;
670
671         if (client->num_persistent_updates == 0) {
672                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
673                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
674                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
675                 return -1;
676         }
677         client->num_persistent_updates--;
678
679         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
680                             "transaction commit db_id[0x%08x]\n",
681                             client->client_id, ctdb_db->db_id));
682
683         return 0;
684 }
685
686 /*
687   called when a client gets an error committing its database
688   during a transaction commit
689  */
690 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
691                                   struct ctdb_req_control *c)
692 {
693         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
694         struct ctdb_db_context *ctdb_db;
695
696         ctdb_db = find_ctdb_db(ctdb, client->db_id);
697         if (ctdb_db == NULL) {
698                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
699                                  "Unknown database 0x%08x\n", client->db_id));
700                 return -1;
701         }
702         if (!ctdb_db->transaction_active) {
703                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
704                                  "Database 0x%08x has no transaction commit "
705                                  "started\n", client->db_id));
706                 return -1;
707         }
708
709         ctdb_db->transaction_active = false;
710         client->db_id = 0;
711
712         if (client->num_persistent_updates == 0) {
713                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
714         } else {
715                 client->num_persistent_updates--;
716         }
717
718         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
719                          " db_id[0x%08x] - forcing recovery\n",
720                          ctdb_db->db_id));
721         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
722
723         return 0;
724 }
725
726 /**
727  * Tell whether a transaction is active on this node on the give DB.
728  */
729 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
730                                    struct ctdb_req_control *c,
731                                    uint32_t db_id)
732 {
733         struct ctdb_db_context *ctdb_db;
734         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
735
736         ctdb_db = find_ctdb_db(ctdb, db_id);
737         if (!ctdb_db) {
738                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
739                 return -1;
740         }
741
742         if (client->db_id == db_id) {
743                 return 0;
744         }
745
746         if (ctdb_db->transaction_active) {
747                 return 1;
748         } else {
749                 return 0;
750         }
751 }
752
753 /*
754   backwards compatibility:
755
756   start a persistent store operation. passing both the key, header and
757   data to the daemon. If the client disconnects before it has issued
758   a persistent_update call to the daemon we trigger a full recovery
759   to ensure the databases are brought back in sync.
760   for now we ignore the recdata that the client has passed to us.
761  */
762 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
763                                       struct ctdb_req_control *c,
764                                       TDB_DATA recdata)
765 {
766         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
767
768         if (client == NULL) {
769                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
770                 return -1;
771         }
772
773         client->num_persistent_updates++;
774
775         return 0;
776 }
777
778 /* 
779   backwards compatibility:
780
781   called to tell ctdbd that it is no longer doing a persistent update 
782 */
783 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
784                                               struct ctdb_req_control *c,
785                                               TDB_DATA recdata)
786 {
787         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
788
789         if (client == NULL) {
790                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
791                 return -1;
792         }
793
794         if (client->num_persistent_updates > 0) {
795                 client->num_persistent_updates--;
796         }
797
798         return 0;
799 }
800
801
802 /*
803   backwards compatibility:
804
805   single record varient of ctdb_control_trans2_commit for older clients
806  */
807 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
808                                       struct ctdb_req_control *c, 
809                                       TDB_DATA recdata, bool *async_reply)
810 {
811         struct ctdb_marshall_buffer *m;
812         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
813         TDB_DATA key, data;
814
815         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
816             rec->keylen + rec->datalen) {
817                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
818                 return -1;
819         }
820
821         key.dptr = &rec->data[0];
822         key.dsize = rec->keylen;
823         data.dptr = &rec->data[rec->keylen];
824         data.dsize = rec->datalen;
825
826         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
827         CTDB_NO_MEMORY(ctdb, m);
828
829         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
830 }
831
832 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
833                                   uint32_t db_id,
834                                   uint64_t *seqnum)
835 {
836         int32_t ret;
837         struct ctdb_db_context *ctdb_db;
838         const char *keyname = CTDB_DB_SEQNUM_KEY;
839         TDB_DATA key;
840         TDB_DATA data;
841         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
842
843         ctdb_db = find_ctdb_db(ctdb, db_id);
844         if (!ctdb_db) {
845                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
846                 ret = -1;
847                 goto done;
848         }
849
850         key.dptr = (uint8_t *)discard_const(keyname);
851         key.dsize = strlen(keyname) + 1;
852
853         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
854         if (ret != 0) {
855                 goto done;
856         }
857
858         if (data.dsize != sizeof(uint64_t)) {
859                 *seqnum = 0;
860                 goto done;
861         }
862
863         *seqnum = *(uint64_t *)data.dptr;
864
865 done:
866         talloc_free(mem_ctx);
867         return ret;
868 }
869
870 /**
871  * Get the sequence number of a persistent database.
872  */
873 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
874                                    TDB_DATA indata,
875                                    TDB_DATA *outdata)
876 {
877         uint32_t db_id;
878         int32_t ret;
879         uint64_t seqnum;
880
881         db_id = *(uint32_t *)indata.dptr;
882         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
883         if (ret != 0) {
884                 goto done;
885         }
886
887         outdata->dsize = sizeof(uint64_t);
888         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
889         if (outdata->dptr == NULL) {
890                 ret = -1;
891                 goto done;
892         }
893
894         *(outdata->dptr) = seqnum;
895
896 done:
897         return ret;
898 }