persistent_callback: ignore the update-recordreturn code of remote node in recovery
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35         uint32_t num_failed, num_sent;
36 };
37
38 /*
39   1) all nodes fail, and all nodes reply
40   2) some nodes fail, all nodes reply
41   3) some nodes timeout
42   4) all nodes succeed
43  */
44
45 /*
46   called when a node has acknowledged a ctdb_control_update_record call
47  */
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49                                      int32_t status, TDB_DATA data, 
50                                      const char *errormsg,
51                                      void *private_data)
52 {
53         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
54                                                               struct ctdb_persistent_state);
55
56         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
57                 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
58                                    "during recovery\n"));
59                 return;
60         }
61
62         if (status != 0) {
63                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
64                          status, errormsg));
65                 state->status = status;
66                 state->errormsg = errormsg;
67                 state->num_failed++;
68         }
69         state->num_pending--;
70         if (state->num_pending == 0) {
71                 enum ctdb_trans2_commit_error etype;
72                 if (state->num_failed == state->num_sent) {
73                         etype = CTDB_TRANS2_COMMIT_ALLFAIL;
74                 } else if (state->num_failed != 0) {
75                         etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
76                 } else {
77                         etype = CTDB_TRANS2_COMMIT_SUCCESS;
78                 }
79                 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
80                 talloc_free(state);
81         }
82 }
83
84 /*
85   called if persistent store times out
86  */
87 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
88                                          struct timeval t, void *private_data)
89 {
90         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
91         
92         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
93                                    "timeout in ctdb_persistent_state");
94
95         talloc_free(state);
96 }
97
98 /*
99   store a set of persistent records - called from a ctdb client when it has updated
100   some records in a persistent database. The client will have the record
101   locked for the duration of this call. The client is the dmaster when 
102   this call is made
103  */
104 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
105                                    struct ctdb_req_control *c, 
106                                    TDB_DATA recdata, bool *async_reply)
107 {
108         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
109         struct ctdb_persistent_state *state;
110         int i;
111         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
112         struct ctdb_db_context *ctdb_db;
113
114         ctdb_db = find_ctdb_db(ctdb, m->db_id);
115         if (ctdb_db == NULL) {
116                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
117                                  "Unknown database db_id[0x%08x]\n", m->db_id));
118                 return -1;
119         }
120
121         if (client == NULL) {
122                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
123                 return -1;
124         }
125
126         if (ctdb_db->unhealthy_reason) {
127                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
128                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
129                 return -1;
130         }
131
132         /* handling num_persistent_updates is a bit strange - 
133            there are 3 cases
134              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
135                 They don't expect num_persistent_updates to be used at all
136
137              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
138                 this commit to then decrement it
139
140              3) new clients which use TRANS2 commit functions, and
141                 expect this function to increment the counter, and
142                 then have it decremented in ctdb_control_trans2_error
143                 or ctdb_control_trans2_finished
144         */
145         switch (c->opcode) {
146         case CTDB_CONTROL_PERSISTENT_STORE:
147                 if (ctdb_db->transaction_active) {
148                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
149                                           "transaction is active on database "
150                                           "db_id[0x%08x] - refusing persistent "
151                                          " store for client id[0x%08x]\n",
152                                           ctdb_db->db_id, client->client_id));
153                         return -1;
154                 }
155                 if (client->num_persistent_updates > 0) {
156                         client->num_persistent_updates--;
157                 }
158                 break;
159         case CTDB_CONTROL_TRANS2_COMMIT:
160                 if (ctdb_db->transaction_active) {
161                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
162                                          " already a transaction commit "
163                                          "active on db_id[0x%08x] - forbidding "
164                                          "client_id[0x%08x] to commit\n",
165                                          ctdb_db->db_id, client->client_id));
166                         return -1;
167                 }
168                 if (client->db_id != 0) {
169                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
170                                          "client-db_id[0x%08x] != 0 "
171                                          "(client_id[0x%08x])\n",
172                                          client->db_id, client->client_id));
173                         return -1;
174                 }
175                 client->num_persistent_updates++;
176                 ctdb_db->transaction_active = true;
177                 client->db_id = m->db_id;
178                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
179                                   " commit transaction on db id[0x%08x]\n",
180                                   client->client_id, client->db_id));
181                 break;
182         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
183                 /* already updated from the first commit */
184                 if (client->db_id != m->db_id) {
185                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
186                                          "retry: client-db_id[0x%08x] != "
187                                          "db_id[0x%08x] (client_id[0x%08x])\n",
188                                          client->db_id,
189                                          m->db_id, client->client_id));
190                         return -1;
191                 }
192                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
193                                     "transaction commit retry on "
194                                     "db_id[0x%08x]\n",
195                                     client->client_id, client->db_id));
196                 break;
197         }
198
199         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
200                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
201                 return -1;
202         }
203
204         state = talloc_zero(ctdb, struct ctdb_persistent_state);
205         CTDB_NO_MEMORY(ctdb, state);
206
207         state->ctdb = ctdb;
208         state->c    = c;
209
210         for (i=0;i<ctdb->vnn_map->size;i++) {
211                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
212                 int ret;
213
214                 /* only send to active nodes */
215                 if (node->flags & NODE_FLAGS_INACTIVE) {
216                         continue;
217                 }
218
219                 /* don't send to ourselves */
220                 if (node->pnn == ctdb->pnn) {
221                         continue;
222                 }
223                 
224                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
225                                                c->client_id, 0, recdata, 
226                                                ctdb_persistent_callback, state);
227                 if (ret == -1) {
228                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
229                         talloc_free(state);
230                         return -1;
231                 }
232
233                 state->num_pending++;
234                 state->num_sent++;
235         }
236
237         if (state->num_pending == 0) {
238                 talloc_free(state);
239                 return 0;
240         }
241         
242         /* we need to wait for the replies */
243         *async_reply = true;
244
245         /* need to keep the control structure around */
246         talloc_steal(state, c);
247
248         /* but we won't wait forever */
249         event_add_timed(ctdb->ev, state, 
250                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
251                         ctdb_persistent_store_timeout, state);
252
253         return 0;
254 }
255
256
257 /*
258  * Store a set of persistent records.
259  * This is used to roll out a transaction to all nodes.
260  */
261 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
262                                    struct ctdb_req_control *c,
263                                    TDB_DATA recdata, bool *async_reply)
264 {
265         struct ctdb_client *client;
266         struct ctdb_persistent_state *state;
267         int i;
268         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
269         struct ctdb_db_context *ctdb_db;
270
271         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
272                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
273                 return -1;
274         }
275
276         ctdb_db = find_ctdb_db(ctdb, m->db_id);
277         if (ctdb_db == NULL) {
278                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
279                                  "Unknown database db_id[0x%08x]\n", m->db_id));
280                 return -1;
281         }
282
283         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
284         if (client == NULL) {
285                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
286                                  "to a client. Returning error\n"));
287                 return -1;
288         }
289
290         state = talloc_zero(ctdb, struct ctdb_persistent_state);
291         CTDB_NO_MEMORY(ctdb, state);
292
293         state->ctdb = ctdb;
294         state->c    = c;
295
296         for (i = 0; i < ctdb->vnn_map->size; i++) {
297                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
298                 int ret;
299
300                 /* only send to active nodes */
301                 if (node->flags & NODE_FLAGS_INACTIVE) {
302                         continue;
303                 }
304
305                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
306                                                CTDB_CONTROL_UPDATE_RECORD,
307                                                c->client_id, 0, recdata,
308                                                ctdb_persistent_callback,
309                                                state);
310                 if (ret == -1) {
311                         DEBUG(DEBUG_ERR,("Unable to send "
312                                          "CTDB_CONTROL_UPDATE_RECORD "
313                                          "to pnn %u\n", node->pnn));
314                         talloc_free(state);
315                         return -1;
316                 }
317
318                 state->num_pending++;
319                 state->num_sent++;
320         }
321
322         if (state->num_pending == 0) {
323                 talloc_free(state);
324                 return 0;
325         }
326
327         /* we need to wait for the replies */
328         *async_reply = true;
329
330         /* need to keep the control structure around */
331         talloc_steal(state, c);
332
333         /* but we won't wait forever */
334         event_add_timed(ctdb->ev, state,
335                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
336                         ctdb_persistent_store_timeout, state);
337
338         return 0;
339 }
340
341
342 struct ctdb_persistent_write_state {
343         struct ctdb_db_context *ctdb_db;
344         struct ctdb_marshall_buffer *m;
345         struct ctdb_req_control *c;
346 };
347
348
349 /*
350   called from a child process to write the data
351  */
352 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
353 {
354         int ret, i;
355         struct ctdb_rec_data *rec = NULL;
356         struct ctdb_marshall_buffer *m = state->m;
357
358         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
359         if (ret == -1) {
360                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
361                                  state->ctdb_db->db_id));
362                 return -1;
363         }
364
365         for (i=0;i<m->count;i++) {
366                 struct ctdb_ltdb_header oldheader;
367                 struct ctdb_ltdb_header header;
368                 TDB_DATA key, data, olddata;
369                 TALLOC_CTX *tmp_ctx = talloc_new(state);
370
371                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
372                 
373                 if (rec == NULL) {
374                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
375                                          i, state->ctdb_db->db_id));
376                         talloc_free(tmp_ctx);
377                         goto failed;                    
378                 }
379
380                 /* fetch the old header and ensure the rsn is less than the new rsn */
381                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
382                 if (ret != 0) {
383                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
384                                          state->ctdb_db->db_id));
385                         talloc_free(tmp_ctx);
386                         goto failed;
387                 }
388
389                 if (oldheader.rsn >= header.rsn &&
390                     (olddata.dsize != data.dsize || 
391                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
392                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
393                                           state->ctdb_db->db_id, 
394                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
395                         talloc_free(tmp_ctx);
396                         goto failed;
397                 }
398
399                 talloc_free(tmp_ctx);
400
401                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
402                 if (ret != 0) {
403                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
404                                           state->ctdb_db->db_id));
405                         goto failed;
406                 }
407         }
408
409         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
410         if (ret == -1) {
411                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
412                                  state->ctdb_db->db_id));
413                 return -1;
414         }
415
416         return 0;
417         
418 failed:
419         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
420         return -1;
421 }
422
423
424 /*
425   called when we the child has completed the persistent write
426   on our behalf
427  */
428 static void ctdb_persistent_write_callback(int status, void *private_data)
429 {
430         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
431                                                                    struct ctdb_persistent_write_state);
432
433
434         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
435
436         talloc_free(state);
437 }
438
439 /*
440   called if our lockwait child times out
441  */
442 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
443                                          struct timeval t, void *private_data)
444 {
445         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
446                                                                    struct ctdb_persistent_write_state);
447         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
448         talloc_free(state);
449 }
450
451 struct childwrite_handle {
452         struct ctdb_context *ctdb;
453         struct ctdb_db_context *ctdb_db;
454         struct fd_event *fde;
455         int fd[2];
456         pid_t child;
457         void *private_data;
458         void (*callback)(int, void *);
459         struct timeval start_time;
460 };
461
462 static int childwrite_destructor(struct childwrite_handle *h)
463 {
464         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
465         kill(h->child, SIGKILL);
466         return 0;
467 }
468
469 /* called when the child process has finished writing the record to the
470    database
471 */
472 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
473                              uint16_t flags, void *private_data)
474 {
475         struct childwrite_handle *h = talloc_get_type(private_data, 
476                                                      struct childwrite_handle);
477         void *p = h->private_data;
478         void (*callback)(int, void *) = h->callback;
479         pid_t child = h->child;
480         TALLOC_CTX *tmp_ctx = talloc_new(ev);
481         int ret;
482         char c;
483
484         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
485         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
486
487         /* the handle needs to go away when the context is gone - when
488            the handle goes away this implicitly closes the pipe, which
489            kills the child */
490         talloc_steal(tmp_ctx, h);
491
492         talloc_set_destructor(h, NULL);
493
494         ret = read(h->fd[0], &c, 1);
495         if (ret < 1) {
496                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
497                 c = 1;
498         }
499
500         callback(c, p);
501
502         kill(child, SIGKILL);
503         talloc_free(tmp_ctx);
504 }
505
506 /* this creates a child process which will take out a tdb transaction
507    and write the record to the database.
508 */
509 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
510                                 void (*callback)(int, void *private_data),
511                                 struct ctdb_persistent_write_state *state)
512 {
513         struct childwrite_handle *result;
514         int ret;
515         pid_t parent = getpid();
516
517         CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
518         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
519
520         if (!(result = talloc_zero(state, struct childwrite_handle))) {
521                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
522                 return NULL;
523         }
524
525         ret = pipe(result->fd);
526
527         if (ret != 0) {
528                 talloc_free(result);
529                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
530                 return NULL;
531         }
532
533         result->child = ctdb_fork(ctdb_db->ctdb);
534
535         if (result->child == (pid_t)-1) {
536                 close(result->fd[0]);
537                 close(result->fd[1]);
538                 talloc_free(result);
539                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
540                 return NULL;
541         }
542
543         result->callback = callback;
544         result->private_data = state;
545         result->ctdb = ctdb_db->ctdb;
546         result->ctdb_db = ctdb_db;
547
548         if (result->child == 0) {
549                 char c = 0;
550
551                 close(result->fd[0]);
552                 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
553                 ret = ctdb_persistent_store(state);
554                 if (ret != 0) {
555                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
556                         c = 1;
557                 }
558
559                 write(result->fd[1], &c, 1);
560
561                 /* make sure we die when our parent dies */
562                 while (kill(parent, 0) == 0 || errno != ESRCH) {
563                         sleep(5);
564                 }
565                 _exit(0);
566         }
567
568         close(result->fd[1]);
569         set_close_on_exec(result->fd[0]);
570
571         talloc_set_destructor(result, childwrite_destructor);
572
573         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
574
575         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
576                                    EVENT_FD_READ, childwrite_handler,
577                                    (void *)result);
578         if (result->fde == NULL) {
579                 talloc_free(result);
580                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
581                 return NULL;
582         }
583         tevent_fd_set_auto_close(result->fde);
584
585         result->start_time = timeval_current();
586
587         return result;
588 }
589
590 /* 
591    update a record on this node if the new record has a higher rsn than the
592    current record
593  */
594 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
595                                    struct ctdb_req_control *c, TDB_DATA recdata, 
596                                    bool *async_reply)
597 {
598         struct ctdb_db_context *ctdb_db;
599         struct ctdb_persistent_write_state *state;
600         struct childwrite_handle *handle;
601         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
602
603         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
604                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
605                 return -1;
606         }
607
608         ctdb_db = find_ctdb_db(ctdb, m->db_id);
609         if (ctdb_db == NULL) {
610                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
611                 return -1;
612         }
613
614         if (ctdb_db->unhealthy_reason) {
615                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
616                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
617                 return -1;
618         }
619
620         state = talloc(ctdb, struct ctdb_persistent_write_state);
621         CTDB_NO_MEMORY(ctdb, state);
622
623         state->ctdb_db = ctdb_db;
624         state->c       = c;
625         state->m       = m;
626
627         /* create a child process to take out a transaction and 
628            write the data.
629         */
630         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
631         if (handle == NULL) {
632                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
633                 talloc_free(state);
634                 return -1;
635         }
636
637         /* we need to wait for the replies */
638         *async_reply = true;
639
640         /* need to keep the control structure around */
641         talloc_steal(state, c);
642
643         /* but we won't wait forever */
644         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
645                         ctdb_persistent_lock_timeout, state);
646
647         return 0;
648 }
649
650
651 /*
652   called when a client has finished a local commit in a transaction to 
653   a persistent database
654  */
655 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
656                                      struct ctdb_req_control *c)
657 {
658         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
659         struct ctdb_db_context *ctdb_db;
660
661         ctdb_db = find_ctdb_db(ctdb, client->db_id);
662         if (ctdb_db == NULL) {
663                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
664                                  "Unknown database 0x%08x\n", client->db_id));
665                 return -1;
666         }
667         if (!ctdb_db->transaction_active) {
668                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
669                                  "Database 0x%08x has no transaction commit "
670                                  "started\n", client->db_id));
671                 return -1;
672         }
673
674         ctdb_db->transaction_active = false;
675         client->db_id = 0;
676
677         if (client->num_persistent_updates == 0) {
678                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
679                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
680                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
681                 return -1;
682         }
683         client->num_persistent_updates--;
684
685         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
686                             "transaction commit db_id[0x%08x]\n",
687                             client->client_id, ctdb_db->db_id));
688
689         return 0;
690 }
691
692 /*
693   called when a client gets an error committing its database
694   during a transaction commit
695  */
696 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
697                                   struct ctdb_req_control *c)
698 {
699         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
700         struct ctdb_db_context *ctdb_db;
701
702         ctdb_db = find_ctdb_db(ctdb, client->db_id);
703         if (ctdb_db == NULL) {
704                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
705                                  "Unknown database 0x%08x\n", client->db_id));
706                 return -1;
707         }
708         if (!ctdb_db->transaction_active) {
709                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
710                                  "Database 0x%08x has no transaction commit "
711                                  "started\n", client->db_id));
712                 return -1;
713         }
714
715         ctdb_db->transaction_active = false;
716         client->db_id = 0;
717
718         if (client->num_persistent_updates == 0) {
719                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
720         } else {
721                 client->num_persistent_updates--;
722         }
723
724         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
725                          " db_id[0x%08x] - forcing recovery\n",
726                          ctdb_db->db_id));
727         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
728
729         return 0;
730 }
731
732 /**
733  * Tell whether a transaction is active on this node on the give DB.
734  */
735 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
736                                    struct ctdb_req_control *c,
737                                    uint32_t db_id)
738 {
739         struct ctdb_db_context *ctdb_db;
740         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
741
742         ctdb_db = find_ctdb_db(ctdb, db_id);
743         if (!ctdb_db) {
744                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
745                 return -1;
746         }
747
748         if (client->db_id == db_id) {
749                 return 0;
750         }
751
752         if (ctdb_db->transaction_active) {
753                 return 1;
754         } else {
755                 return 0;
756         }
757 }
758
759 /*
760   backwards compatibility:
761
762   start a persistent store operation. passing both the key, header and
763   data to the daemon. If the client disconnects before it has issued
764   a persistent_update call to the daemon we trigger a full recovery
765   to ensure the databases are brought back in sync.
766   for now we ignore the recdata that the client has passed to us.
767  */
768 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
769                                       struct ctdb_req_control *c,
770                                       TDB_DATA recdata)
771 {
772         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
773
774         if (client == NULL) {
775                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
776                 return -1;
777         }
778
779         client->num_persistent_updates++;
780
781         return 0;
782 }
783
784 /* 
785   backwards compatibility:
786
787   called to tell ctdbd that it is no longer doing a persistent update 
788 */
789 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
790                                               struct ctdb_req_control *c,
791                                               TDB_DATA recdata)
792 {
793         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
794
795         if (client == NULL) {
796                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
797                 return -1;
798         }
799
800         if (client->num_persistent_updates > 0) {
801                 client->num_persistent_updates--;
802         }
803
804         return 0;
805 }
806
807
808 /*
809   backwards compatibility:
810
811   single record varient of ctdb_control_trans2_commit for older clients
812  */
813 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
814                                       struct ctdb_req_control *c, 
815                                       TDB_DATA recdata, bool *async_reply)
816 {
817         struct ctdb_marshall_buffer *m;
818         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
819         TDB_DATA key, data;
820
821         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
822             rec->keylen + rec->datalen) {
823                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
824                 return -1;
825         }
826
827         key.dptr = &rec->data[0];
828         key.dsize = rec->keylen;
829         data.dptr = &rec->data[rec->keylen];
830         data.dsize = rec->datalen;
831
832         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
833         CTDB_NO_MEMORY(ctdb, m);
834
835         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
836 }
837
838 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
839                                   uint32_t db_id,
840                                   uint64_t *seqnum)
841 {
842         int32_t ret;
843         struct ctdb_db_context *ctdb_db;
844         const char *keyname = CTDB_DB_SEQNUM_KEY;
845         TDB_DATA key;
846         TDB_DATA data;
847         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
848
849         ctdb_db = find_ctdb_db(ctdb, db_id);
850         if (!ctdb_db) {
851                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
852                 ret = -1;
853                 goto done;
854         }
855
856         key.dptr = (uint8_t *)discard_const(keyname);
857         key.dsize = strlen(keyname) + 1;
858
859         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
860         if (ret != 0) {
861                 goto done;
862         }
863
864         if (data.dsize != sizeof(uint64_t)) {
865                 *seqnum = 0;
866                 goto done;
867         }
868
869         *seqnum = *(uint64_t *)data.dptr;
870
871 done:
872         talloc_free(mem_ctx);
873         return ret;
874 }
875
876 /**
877  * Get the sequence number of a persistent database.
878  */
879 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
880                                    TDB_DATA indata,
881                                    TDB_DATA *outdata)
882 {
883         uint32_t db_id;
884         int32_t ret;
885         uint64_t seqnum;
886
887         db_id = *(uint32_t *)indata.dptr;
888         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
889         if (ret != 0) {
890                 goto done;
891         }
892
893         outdata->dsize = sizeof(uint64_t);
894         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
895         if (outdata->dptr == NULL) {
896                 ret = -1;
897                 goto done;
898         }
899
900         *(outdata->dptr) = seqnum;
901
902 done:
903         return ret;
904 }