rename ctdb.h to ctdb_protocol.h
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "include/ctdb_protocol.h"
28 #include "include/ctdb_private.h"
29
30 struct ctdb_persistent_state {
31         struct ctdb_context *ctdb;
32         struct ctdb_req_control *c;
33         const char *errormsg;
34         uint32_t num_pending;
35         int32_t status;
36         uint32_t num_failed, num_sent;
37 };
38
39 /*
40   1) all nodes fail, and all nodes reply
41   2) some nodes fail, all nodes reply
42   3) some nodes timeout
43   4) all nodes succeed
44  */
45
46 /*
47   called when a node has acknowledged a ctdb_control_update_record call
48  */
49 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
50                                      int32_t status, TDB_DATA data, 
51                                      const char *errormsg,
52                                      void *private_data)
53 {
54         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
55                                                               struct ctdb_persistent_state);
56
57         if (status != 0) {
58                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
59                          status, errormsg));
60                 state->status = status;
61                 state->errormsg = errormsg;
62                 state->num_failed++;
63         }
64         state->num_pending--;
65         if (state->num_pending == 0) {
66                 enum ctdb_trans2_commit_error etype;
67                 if (state->num_failed == state->num_sent) {
68                         etype = CTDB_TRANS2_COMMIT_ALLFAIL;
69                 } else if (state->num_failed != 0) {
70                         etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
71                 } else {
72                         etype = CTDB_TRANS2_COMMIT_SUCCESS;
73                 }
74                 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
75                 talloc_free(state);
76         }
77 }
78
79 /*
80   called if persistent store times out
81  */
82 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
83                                          struct timeval t, void *private_data)
84 {
85         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
86         
87         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
88                                    "timeout in ctdb_persistent_state");
89
90         talloc_free(state);
91 }
92
93 /*
94   store a set of persistent records - called from a ctdb client when it has updated
95   some records in a persistent database. The client will have the record
96   locked for the duration of this call. The client is the dmaster when 
97   this call is made
98  */
99 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
100                                    struct ctdb_req_control *c, 
101                                    TDB_DATA recdata, bool *async_reply)
102 {
103         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
104         struct ctdb_persistent_state *state;
105         int i;
106         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
107         struct ctdb_db_context *ctdb_db;
108
109         ctdb_db = find_ctdb_db(ctdb, m->db_id);
110         if (ctdb_db == NULL) {
111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
112                                  "Unknown database db_id[0x%08x]\n", m->db_id));
113                 return -1;
114         }
115
116         if (client == NULL) {
117                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
118                 return -1;
119         }
120
121         if (ctdb_db->unhealthy_reason) {
122                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
123                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
124                 return -1;
125         }
126
127         /* handling num_persistent_updates is a bit strange - 
128            there are 3 cases
129              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
130                 They don't expect num_persistent_updates to be used at all
131
132              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
133                 this commit to then decrement it
134
135              3) new clients which use TRANS2 commit functions, and
136                 expect this function to increment the counter, and
137                 then have it decremented in ctdb_control_trans2_error
138                 or ctdb_control_trans2_finished
139         */
140         switch (c->opcode) {
141         case CTDB_CONTROL_PERSISTENT_STORE:
142                 if (ctdb_db->transaction_active) {
143                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
144                                           "transaction is active on database "
145                                           "db_id[0x%08x] - refusing persistent "
146                                          " store for client id[0x%08x]\n",
147                                           ctdb_db->db_id, client->client_id));
148                         return -1;
149                 }
150                 if (client->num_persistent_updates > 0) {
151                         client->num_persistent_updates--;
152                 }
153                 break;
154         case CTDB_CONTROL_TRANS2_COMMIT:
155                 if (ctdb_db->transaction_active) {
156                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
157                                          " already a transaction commit "
158                                          "active on db_id[0x%08x] - forbidding "
159                                          "client_id[0x%08x] to commit\n",
160                                          ctdb_db->db_id, client->client_id));
161                         return -1;
162                 }
163                 if (client->db_id != 0) {
164                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
165                                          "client-db_id[0x%08x] != 0 "
166                                          "(client_id[0x%08x])\n",
167                                          client->db_id, client->client_id));
168                         return -1;
169                 }
170                 client->num_persistent_updates++;
171                 ctdb_db->transaction_active = true;
172                 client->db_id = m->db_id;
173                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
174                                   " commit transaction on db id[0x%08x]\n",
175                                   client->client_id, client->db_id));
176                 break;
177         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
178                 /* already updated from the first commit */
179                 if (client->db_id != m->db_id) {
180                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
181                                          "retry: client-db_id[0x%08x] != "
182                                          "db_id[0x%08x] (client_id[0x%08x])\n",
183                                          client->db_id,
184                                          m->db_id, client->client_id));
185                         return -1;
186                 }
187                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
188                                     "transaction commit retry on "
189                                     "db_id[0x%08x]\n",
190                                     client->client_id, client->db_id));
191                 break;
192         }
193
194         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
195                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
196                 return -1;
197         }
198
199         state = talloc_zero(ctdb, struct ctdb_persistent_state);
200         CTDB_NO_MEMORY(ctdb, state);
201
202         state->ctdb = ctdb;
203         state->c    = c;
204
205         for (i=0;i<ctdb->vnn_map->size;i++) {
206                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
207                 int ret;
208
209                 /* only send to active nodes */
210                 if (node->flags & NODE_FLAGS_INACTIVE) {
211                         continue;
212                 }
213
214                 /* don't send to ourselves */
215                 if (node->pnn == ctdb->pnn) {
216                         continue;
217                 }
218                 
219                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
220                                                c->client_id, 0, recdata, 
221                                                ctdb_persistent_callback, state);
222                 if (ret == -1) {
223                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
224                         talloc_free(state);
225                         return -1;
226                 }
227
228                 state->num_pending++;
229                 state->num_sent++;
230         }
231
232         if (state->num_pending == 0) {
233                 talloc_free(state);
234                 return 0;
235         }
236         
237         /* we need to wait for the replies */
238         *async_reply = true;
239
240         /* need to keep the control structure around */
241         talloc_steal(state, c);
242
243         /* but we won't wait forever */
244         event_add_timed(ctdb->ev, state, 
245                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
246                         ctdb_persistent_store_timeout, state);
247
248         return 0;
249 }
250
251
252 /*
253  * Store a set of persistent records.
254  * This is used to roll out a transaction to all nodes.
255  */
256 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
257                                    struct ctdb_req_control *c,
258                                    TDB_DATA recdata, bool *async_reply)
259 {
260         struct ctdb_client *client;
261         struct ctdb_persistent_state *state;
262         int i;
263         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
264         struct ctdb_db_context *ctdb_db;
265
266         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
267                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
268                 return -1;
269         }
270
271         ctdb_db = find_ctdb_db(ctdb, m->db_id);
272         if (ctdb_db == NULL) {
273                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
274                                  "Unknown database db_id[0x%08x]\n", m->db_id));
275                 return -1;
276         }
277
278         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
279         if (client == NULL) {
280                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
281                                  "to a client. Returning error\n"));
282                 return -1;
283         }
284
285         state = talloc_zero(ctdb, struct ctdb_persistent_state);
286         CTDB_NO_MEMORY(ctdb, state);
287
288         state->ctdb = ctdb;
289         state->c    = c;
290
291         for (i = 0; i < ctdb->vnn_map->size; i++) {
292                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
293                 int ret;
294
295                 /* only send to active nodes */
296                 if (node->flags & NODE_FLAGS_INACTIVE) {
297                         continue;
298                 }
299
300                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
301                                                CTDB_CONTROL_UPDATE_RECORD,
302                                                c->client_id, 0, recdata,
303                                                ctdb_persistent_callback,
304                                                state);
305                 if (ret == -1) {
306                         DEBUG(DEBUG_ERR,("Unable to send "
307                                          "CTDB_CONTROL_UPDATE_RECORD "
308                                          "to pnn %u\n", node->pnn));
309                         talloc_free(state);
310                         return -1;
311                 }
312
313                 state->num_pending++;
314                 state->num_sent++;
315         }
316
317         if (state->num_pending == 0) {
318                 talloc_free(state);
319                 return 0;
320         }
321
322         /* we need to wait for the replies */
323         *async_reply = true;
324
325         /* need to keep the control structure around */
326         talloc_steal(state, c);
327
328         /* but we won't wait forever */
329         event_add_timed(ctdb->ev, state,
330                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
331                         ctdb_persistent_store_timeout, state);
332
333         return 0;
334 }
335
336
337 struct ctdb_persistent_write_state {
338         struct ctdb_db_context *ctdb_db;
339         struct ctdb_marshall_buffer *m;
340         struct ctdb_req_control *c;
341 };
342
343
344 /*
345   called from a child process to write the data
346  */
347 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
348 {
349         int ret, i;
350         struct ctdb_rec_data *rec = NULL;
351         struct ctdb_marshall_buffer *m = state->m;
352
353         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
354         if (ret == -1) {
355                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
356                                  state->ctdb_db->db_id));
357                 return -1;
358         }
359
360         for (i=0;i<m->count;i++) {
361                 struct ctdb_ltdb_header oldheader;
362                 struct ctdb_ltdb_header header;
363                 TDB_DATA key, data, olddata;
364                 TALLOC_CTX *tmp_ctx = talloc_new(state);
365
366                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
367                 
368                 if (rec == NULL) {
369                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
370                                          i, state->ctdb_db->db_id));
371                         talloc_free(tmp_ctx);
372                         goto failed;                    
373                 }
374
375                 /* fetch the old header and ensure the rsn is less than the new rsn */
376                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
377                 if (ret != 0) {
378                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
379                                          state->ctdb_db->db_id));
380                         talloc_free(tmp_ctx);
381                         goto failed;
382                 }
383
384                 if (oldheader.rsn >= header.rsn &&
385                     (olddata.dsize != data.dsize || 
386                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
387                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
388                                           state->ctdb_db->db_id, 
389                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
390                         talloc_free(tmp_ctx);
391                         goto failed;
392                 }
393
394                 talloc_free(tmp_ctx);
395
396                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
397                 if (ret != 0) {
398                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
399                                           state->ctdb_db->db_id));
400                         goto failed;
401                 }
402         }
403
404         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
405         if (ret == -1) {
406                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
407                                  state->ctdb_db->db_id));
408                 return -1;
409         }
410
411         return 0;
412         
413 failed:
414         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
415         return -1;
416 }
417
418
419 /*
420   called when we the child has completed the persistent write
421   on our behalf
422  */
423 static void ctdb_persistent_write_callback(int status, void *private_data)
424 {
425         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
426                                                                    struct ctdb_persistent_write_state);
427
428
429         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
430
431         talloc_free(state);
432 }
433
434 /*
435   called if our lockwait child times out
436  */
437 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
438                                          struct timeval t, void *private_data)
439 {
440         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
441                                                                    struct ctdb_persistent_write_state);
442         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
443         talloc_free(state);
444 }
445
446 struct childwrite_handle {
447         struct ctdb_context *ctdb;
448         struct ctdb_db_context *ctdb_db;
449         struct fd_event *fde;
450         int fd[2];
451         pid_t child;
452         void *private_data;
453         void (*callback)(int, void *);
454         struct timeval start_time;
455 };
456
457 static int childwrite_destructor(struct childwrite_handle *h)
458 {
459         h->ctdb->statistics.pending_childwrite_calls--;
460         kill(h->child, SIGKILL);
461         return 0;
462 }
463
464 /* called when the child process has finished writing the record to the
465    database
466 */
467 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
468                              uint16_t flags, void *private_data)
469 {
470         struct childwrite_handle *h = talloc_get_type(private_data, 
471                                                      struct childwrite_handle);
472         void *p = h->private_data;
473         void (*callback)(int, void *) = h->callback;
474         pid_t child = h->child;
475         TALLOC_CTX *tmp_ctx = talloc_new(ev);
476         int ret;
477         char c;
478
479         ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
480         h->ctdb->statistics.pending_childwrite_calls--;
481
482         /* the handle needs to go away when the context is gone - when
483            the handle goes away this implicitly closes the pipe, which
484            kills the child */
485         talloc_steal(tmp_ctx, h);
486
487         talloc_set_destructor(h, NULL);
488
489         ret = read(h->fd[0], &c, 1);
490         if (ret < 1) {
491                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
492                 c = 1;
493         }
494
495         callback(c, p);
496
497         kill(child, SIGKILL);
498         talloc_free(tmp_ctx);
499 }
500
501 /* this creates a child process which will take out a tdb transaction
502    and write the record to the database.
503 */
504 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
505                                 void (*callback)(int, void *private_data),
506                                 struct ctdb_persistent_write_state *state)
507 {
508         struct childwrite_handle *result;
509         int ret;
510         pid_t parent = getpid();
511
512         ctdb_db->ctdb->statistics.childwrite_calls++;
513         ctdb_db->ctdb->statistics.pending_childwrite_calls++;
514
515         if (!(result = talloc_zero(state, struct childwrite_handle))) {
516                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
517                 return NULL;
518         }
519
520         ret = pipe(result->fd);
521
522         if (ret != 0) {
523                 talloc_free(result);
524                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
525                 return NULL;
526         }
527
528         result->child = fork();
529
530         if (result->child == (pid_t)-1) {
531                 close(result->fd[0]);
532                 close(result->fd[1]);
533                 talloc_free(result);
534                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
535                 return NULL;
536         }
537
538         result->callback = callback;
539         result->private_data = state;
540         result->ctdb = ctdb_db->ctdb;
541         result->ctdb_db = ctdb_db;
542
543         if (result->child == 0) {
544                 char c = 0;
545
546                 close(result->fd[0]);
547                 ret = ctdb_persistent_store(state);
548                 if (ret != 0) {
549                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
550                         c = 1;
551                 }
552
553                 write(result->fd[1], &c, 1);
554
555                 /* make sure we die when our parent dies */
556                 while (kill(parent, 0) == 0 || errno != ESRCH) {
557                         sleep(5);
558                 }
559                 _exit(0);
560         }
561
562         close(result->fd[1]);
563         set_close_on_exec(result->fd[0]);
564
565         talloc_set_destructor(result, childwrite_destructor);
566
567         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
568
569         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
570                                    EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
571                                    (void *)result);
572         if (result->fde == NULL) {
573                 talloc_free(result);
574                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
575                 return NULL;
576         }
577
578         result->start_time = timeval_current();
579
580         return result;
581 }
582
583 /* 
584    update a record on this node if the new record has a higher rsn than the
585    current record
586  */
587 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
588                                    struct ctdb_req_control *c, TDB_DATA recdata, 
589                                    bool *async_reply)
590 {
591         struct ctdb_db_context *ctdb_db;
592         struct ctdb_persistent_write_state *state;
593         struct childwrite_handle *handle;
594         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
595
596         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
597                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
598                 return -1;
599         }
600
601         ctdb_db = find_ctdb_db(ctdb, m->db_id);
602         if (ctdb_db == NULL) {
603                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
604                 return -1;
605         }
606
607         if (ctdb_db->unhealthy_reason) {
608                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
609                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
610                 return -1;
611         }
612
613         state = talloc(ctdb, struct ctdb_persistent_write_state);
614         CTDB_NO_MEMORY(ctdb, state);
615
616         state->ctdb_db = ctdb_db;
617         state->c       = c;
618         state->m       = m;
619
620         /* create a child process to take out a transaction and 
621            write the data.
622         */
623         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
624         if (handle == NULL) {
625                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
626                 talloc_free(state);
627                 return -1;
628         }
629
630         /* we need to wait for the replies */
631         *async_reply = true;
632
633         /* need to keep the control structure around */
634         talloc_steal(state, c);
635
636         /* but we won't wait forever */
637         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
638                         ctdb_persistent_lock_timeout, state);
639
640         return 0;
641 }
642
643
644 /*
645   called when a client has finished a local commit in a transaction to 
646   a persistent database
647  */
648 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
649                                      struct ctdb_req_control *c)
650 {
651         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
652         struct ctdb_db_context *ctdb_db;
653
654         ctdb_db = find_ctdb_db(ctdb, client->db_id);
655         if (ctdb_db == NULL) {
656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
657                                  "Unknown database 0x%08x\n", client->db_id));
658                 return -1;
659         }
660         if (!ctdb_db->transaction_active) {
661                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
662                                  "Database 0x%08x has no transaction commit "
663                                  "started\n", client->db_id));
664                 return -1;
665         }
666
667         ctdb_db->transaction_active = false;
668         client->db_id = 0;
669
670         if (client->num_persistent_updates == 0) {
671                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
672                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
673                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
674                 return -1;
675         }
676         client->num_persistent_updates--;
677
678         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
679                             "transaction commit db_id[0x%08x]\n",
680                             client->client_id, ctdb_db->db_id));
681
682         return 0;
683 }
684
685 /*
686   called when a client gets an error committing its database
687   during a transaction commit
688  */
689 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
690                                   struct ctdb_req_control *c)
691 {
692         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
693         struct ctdb_db_context *ctdb_db;
694
695         ctdb_db = find_ctdb_db(ctdb, client->db_id);
696         if (ctdb_db == NULL) {
697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
698                                  "Unknown database 0x%08x\n", client->db_id));
699                 return -1;
700         }
701         if (!ctdb_db->transaction_active) {
702                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
703                                  "Database 0x%08x has no transaction commit "
704                                  "started\n", client->db_id));
705                 return -1;
706         }
707
708         ctdb_db->transaction_active = false;
709         client->db_id = 0;
710
711         if (client->num_persistent_updates == 0) {
712                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
713         } else {
714                 client->num_persistent_updates--;
715         }
716
717         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
718                          " db_id[0x%08x] - forcing recovery\n",
719                          ctdb_db->db_id));
720         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
721
722         return 0;
723 }
724
725 /**
726  * Tell whether a transaction is active on this node on the give DB.
727  */
728 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
729                                    struct ctdb_req_control *c,
730                                    uint32_t db_id)
731 {
732         struct ctdb_db_context *ctdb_db;
733         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
734
735         ctdb_db = find_ctdb_db(ctdb, db_id);
736         if (!ctdb_db) {
737                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
738                 return -1;
739         }
740
741         if (client->db_id == db_id) {
742                 return 0;
743         }
744
745         if (ctdb_db->transaction_active) {
746                 return 1;
747         } else {
748                 return 0;
749         }
750 }
751
752 /*
753   backwards compatibility:
754
755   start a persistent store operation. passing both the key, header and
756   data to the daemon. If the client disconnects before it has issued
757   a persistent_update call to the daemon we trigger a full recovery
758   to ensure the databases are brought back in sync.
759   for now we ignore the recdata that the client has passed to us.
760  */
761 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
762                                       struct ctdb_req_control *c,
763                                       TDB_DATA recdata)
764 {
765         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
766
767         if (client == NULL) {
768                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
769                 return -1;
770         }
771
772         client->num_persistent_updates++;
773
774         return 0;
775 }
776
777 /* 
778   backwards compatibility:
779
780   called to tell ctdbd that it is no longer doing a persistent update 
781 */
782 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
783                                               struct ctdb_req_control *c,
784                                               TDB_DATA recdata)
785 {
786         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
787
788         if (client == NULL) {
789                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
790                 return -1;
791         }
792
793         if (client->num_persistent_updates > 0) {
794                 client->num_persistent_updates--;
795         }
796
797         return 0;
798 }
799
800
801 /*
802   backwards compatibility:
803
804   single record varient of ctdb_control_trans2_commit for older clients
805  */
806 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
807                                       struct ctdb_req_control *c, 
808                                       TDB_DATA recdata, bool *async_reply)
809 {
810         struct ctdb_marshall_buffer *m;
811         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
812         TDB_DATA key, data;
813
814         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
815             rec->keylen + rec->datalen) {
816                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
817                 return -1;
818         }
819
820         key.dptr = &rec->data[0];
821         key.dsize = rec->keylen;
822         data.dptr = &rec->data[rec->keylen];
823         data.dsize = rec->datalen;
824
825         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
826         CTDB_NO_MEMORY(ctdb, m);
827
828         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
829 }
830
831 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
832                                   uint32_t db_id,
833                                   uint64_t *seqnum)
834 {
835         int32_t ret;
836         struct ctdb_db_context *ctdb_db;
837         const char *keyname = CTDB_DB_SEQNUM_KEY;
838         TDB_DATA key;
839         TDB_DATA data;
840         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
841
842         ctdb_db = find_ctdb_db(ctdb, db_id);
843         if (!ctdb_db) {
844                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
845                 ret = -1;
846                 goto done;
847         }
848
849         key.dptr = (uint8_t *)discard_const(keyname);
850         key.dsize = strlen(keyname) + 1;
851
852         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
853         if (ret != 0) {
854                 goto done;
855         }
856
857         if (data.dsize != sizeof(uint64_t)) {
858                 *seqnum = 0;
859                 goto done;
860         }
861
862         *seqnum = *(uint64_t *)data.dptr;
863
864 done:
865         talloc_free(mem_ctx);
866         return ret;
867 }
868
869 /**
870  * Get the sequence number of a persistent database.
871  */
872 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
873                                    TDB_DATA indata,
874                                    TDB_DATA *outdata)
875 {
876         uint32_t db_id;
877         int32_t ret;
878         uint64_t seqnum;
879
880         db_id = *(uint32_t *)indata.dptr;
881         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
882         if (ret != 0) {
883                 goto done;
884         }
885
886         outdata->dsize = sizeof(uint64_t);
887         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
888         if (outdata->dptr == NULL) {
889                 ret = -1;
890                 goto done;
891         }
892
893         *(outdata->dptr) = seqnum;
894
895 done:
896         return ret;
897 }