persistent_store_timout: do not really time out the trans3_commit control in recovery
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35         uint32_t num_failed, num_sent;
36 };
37
38 /*
39   1) all nodes fail, and all nodes reply
40   2) some nodes fail, all nodes reply
41   3) some nodes timeout
42   4) all nodes succeed
43  */
44
45 /*
46   called when a node has acknowledged a ctdb_control_update_record call
47  */
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49                                      int32_t status, TDB_DATA data, 
50                                      const char *errormsg,
51                                      void *private_data)
52 {
53         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
54                                                               struct ctdb_persistent_state);
55
56         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
57                 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
58                                    "during recovery\n"));
59                 return;
60         }
61
62         if (status != 0) {
63                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
64                          status, errormsg));
65                 state->status = status;
66                 state->errormsg = errormsg;
67                 state->num_failed++;
68         }
69         state->num_pending--;
70         if (state->num_pending == 0) {
71                 enum ctdb_trans2_commit_error etype;
72                 if (state->num_failed == state->num_sent) {
73                         etype = CTDB_TRANS2_COMMIT_ALLFAIL;
74                 } else if (state->num_failed != 0) {
75                         etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
76                 } else {
77                         etype = CTDB_TRANS2_COMMIT_SUCCESS;
78                 }
79                 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
80                 talloc_free(state);
81         }
82 }
83
84 /*
85   called if persistent store times out
86  */
87 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
88                                          struct timeval t, void *private_data)
89 {
90         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
91
92         if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
93                 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
94                                    "timeout during recovery\n"));
95                 return;
96         }
97
98         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
99                                    "timeout in ctdb_persistent_state");
100
101         talloc_free(state);
102 }
103
104 /*
105   store a set of persistent records - called from a ctdb client when it has updated
106   some records in a persistent database. The client will have the record
107   locked for the duration of this call. The client is the dmaster when 
108   this call is made
109  */
110 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
111                                    struct ctdb_req_control *c, 
112                                    TDB_DATA recdata, bool *async_reply)
113 {
114         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
115         struct ctdb_persistent_state *state;
116         int i;
117         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
118         struct ctdb_db_context *ctdb_db;
119
120         ctdb_db = find_ctdb_db(ctdb, m->db_id);
121         if (ctdb_db == NULL) {
122                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
123                                  "Unknown database db_id[0x%08x]\n", m->db_id));
124                 return -1;
125         }
126
127         if (client == NULL) {
128                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
129                 return -1;
130         }
131
132         if (ctdb_db->unhealthy_reason) {
133                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
134                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
135                 return -1;
136         }
137
138         /* handling num_persistent_updates is a bit strange - 
139            there are 3 cases
140              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
141                 They don't expect num_persistent_updates to be used at all
142
143              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
144                 this commit to then decrement it
145
146              3) new clients which use TRANS2 commit functions, and
147                 expect this function to increment the counter, and
148                 then have it decremented in ctdb_control_trans2_error
149                 or ctdb_control_trans2_finished
150         */
151         switch (c->opcode) {
152         case CTDB_CONTROL_PERSISTENT_STORE:
153                 if (ctdb_db->transaction_active) {
154                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
155                                           "transaction is active on database "
156                                           "db_id[0x%08x] - refusing persistent "
157                                          " store for client id[0x%08x]\n",
158                                           ctdb_db->db_id, client->client_id));
159                         return -1;
160                 }
161                 if (client->num_persistent_updates > 0) {
162                         client->num_persistent_updates--;
163                 }
164                 break;
165         case CTDB_CONTROL_TRANS2_COMMIT:
166                 if (ctdb_db->transaction_active) {
167                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
168                                          " already a transaction commit "
169                                          "active on db_id[0x%08x] - forbidding "
170                                          "client_id[0x%08x] to commit\n",
171                                          ctdb_db->db_id, client->client_id));
172                         return -1;
173                 }
174                 if (client->db_id != 0) {
175                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
176                                          "client-db_id[0x%08x] != 0 "
177                                          "(client_id[0x%08x])\n",
178                                          client->db_id, client->client_id));
179                         return -1;
180                 }
181                 client->num_persistent_updates++;
182                 ctdb_db->transaction_active = true;
183                 client->db_id = m->db_id;
184                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
185                                   " commit transaction on db id[0x%08x]\n",
186                                   client->client_id, client->db_id));
187                 break;
188         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
189                 /* already updated from the first commit */
190                 if (client->db_id != m->db_id) {
191                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
192                                          "retry: client-db_id[0x%08x] != "
193                                          "db_id[0x%08x] (client_id[0x%08x])\n",
194                                          client->db_id,
195                                          m->db_id, client->client_id));
196                         return -1;
197                 }
198                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
199                                     "transaction commit retry on "
200                                     "db_id[0x%08x]\n",
201                                     client->client_id, client->db_id));
202                 break;
203         }
204
205         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
206                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
207                 return -1;
208         }
209
210         state = talloc_zero(ctdb, struct ctdb_persistent_state);
211         CTDB_NO_MEMORY(ctdb, state);
212
213         state->ctdb = ctdb;
214         state->c    = c;
215
216         for (i=0;i<ctdb->vnn_map->size;i++) {
217                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
218                 int ret;
219
220                 /* only send to active nodes */
221                 if (node->flags & NODE_FLAGS_INACTIVE) {
222                         continue;
223                 }
224
225                 /* don't send to ourselves */
226                 if (node->pnn == ctdb->pnn) {
227                         continue;
228                 }
229                 
230                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
231                                                c->client_id, 0, recdata, 
232                                                ctdb_persistent_callback, state);
233                 if (ret == -1) {
234                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
235                         talloc_free(state);
236                         return -1;
237                 }
238
239                 state->num_pending++;
240                 state->num_sent++;
241         }
242
243         if (state->num_pending == 0) {
244                 talloc_free(state);
245                 return 0;
246         }
247         
248         /* we need to wait for the replies */
249         *async_reply = true;
250
251         /* need to keep the control structure around */
252         talloc_steal(state, c);
253
254         /* but we won't wait forever */
255         event_add_timed(ctdb->ev, state, 
256                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
257                         ctdb_persistent_store_timeout, state);
258
259         return 0;
260 }
261
262
263 /*
264  * Store a set of persistent records.
265  * This is used to roll out a transaction to all nodes.
266  */
267 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
268                                    struct ctdb_req_control *c,
269                                    TDB_DATA recdata, bool *async_reply)
270 {
271         struct ctdb_client *client;
272         struct ctdb_persistent_state *state;
273         int i;
274         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
275         struct ctdb_db_context *ctdb_db;
276
277         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
278                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
279                 return -1;
280         }
281
282         ctdb_db = find_ctdb_db(ctdb, m->db_id);
283         if (ctdb_db == NULL) {
284                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
285                                  "Unknown database db_id[0x%08x]\n", m->db_id));
286                 return -1;
287         }
288
289         client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
290         if (client == NULL) {
291                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
292                                  "to a client. Returning error\n"));
293                 return -1;
294         }
295
296         state = talloc_zero(ctdb, struct ctdb_persistent_state);
297         CTDB_NO_MEMORY(ctdb, state);
298
299         state->ctdb = ctdb;
300         state->c    = c;
301
302         for (i = 0; i < ctdb->vnn_map->size; i++) {
303                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
304                 int ret;
305
306                 /* only send to active nodes */
307                 if (node->flags & NODE_FLAGS_INACTIVE) {
308                         continue;
309                 }
310
311                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
312                                                CTDB_CONTROL_UPDATE_RECORD,
313                                                c->client_id, 0, recdata,
314                                                ctdb_persistent_callback,
315                                                state);
316                 if (ret == -1) {
317                         DEBUG(DEBUG_ERR,("Unable to send "
318                                          "CTDB_CONTROL_UPDATE_RECORD "
319                                          "to pnn %u\n", node->pnn));
320                         talloc_free(state);
321                         return -1;
322                 }
323
324                 state->num_pending++;
325                 state->num_sent++;
326         }
327
328         if (state->num_pending == 0) {
329                 talloc_free(state);
330                 return 0;
331         }
332
333         /* we need to wait for the replies */
334         *async_reply = true;
335
336         /* need to keep the control structure around */
337         talloc_steal(state, c);
338
339         /* but we won't wait forever */
340         event_add_timed(ctdb->ev, state,
341                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
342                         ctdb_persistent_store_timeout, state);
343
344         return 0;
345 }
346
347
348 struct ctdb_persistent_write_state {
349         struct ctdb_db_context *ctdb_db;
350         struct ctdb_marshall_buffer *m;
351         struct ctdb_req_control *c;
352 };
353
354
355 /*
356   called from a child process to write the data
357  */
358 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
359 {
360         int ret, i;
361         struct ctdb_rec_data *rec = NULL;
362         struct ctdb_marshall_buffer *m = state->m;
363
364         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
365         if (ret == -1) {
366                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
367                                  state->ctdb_db->db_id));
368                 return -1;
369         }
370
371         for (i=0;i<m->count;i++) {
372                 struct ctdb_ltdb_header oldheader;
373                 struct ctdb_ltdb_header header;
374                 TDB_DATA key, data, olddata;
375                 TALLOC_CTX *tmp_ctx = talloc_new(state);
376
377                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
378                 
379                 if (rec == NULL) {
380                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
381                                          i, state->ctdb_db->db_id));
382                         talloc_free(tmp_ctx);
383                         goto failed;                    
384                 }
385
386                 /* fetch the old header and ensure the rsn is less than the new rsn */
387                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
388                 if (ret != 0) {
389                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
390                                          state->ctdb_db->db_id));
391                         talloc_free(tmp_ctx);
392                         goto failed;
393                 }
394
395                 if (oldheader.rsn >= header.rsn &&
396                     (olddata.dsize != data.dsize || 
397                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
398                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
399                                           state->ctdb_db->db_id, 
400                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
401                         talloc_free(tmp_ctx);
402                         goto failed;
403                 }
404
405                 talloc_free(tmp_ctx);
406
407                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
408                 if (ret != 0) {
409                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
410                                           state->ctdb_db->db_id));
411                         goto failed;
412                 }
413         }
414
415         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
416         if (ret == -1) {
417                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
418                                  state->ctdb_db->db_id));
419                 return -1;
420         }
421
422         return 0;
423         
424 failed:
425         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
426         return -1;
427 }
428
429
430 /*
431   called when we the child has completed the persistent write
432   on our behalf
433  */
434 static void ctdb_persistent_write_callback(int status, void *private_data)
435 {
436         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
437                                                                    struct ctdb_persistent_write_state);
438
439
440         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
441
442         talloc_free(state);
443 }
444
445 /*
446   called if our lockwait child times out
447  */
448 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
449                                          struct timeval t, void *private_data)
450 {
451         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
452                                                                    struct ctdb_persistent_write_state);
453         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
454         talloc_free(state);
455 }
456
457 struct childwrite_handle {
458         struct ctdb_context *ctdb;
459         struct ctdb_db_context *ctdb_db;
460         struct fd_event *fde;
461         int fd[2];
462         pid_t child;
463         void *private_data;
464         void (*callback)(int, void *);
465         struct timeval start_time;
466 };
467
468 static int childwrite_destructor(struct childwrite_handle *h)
469 {
470         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
471         kill(h->child, SIGKILL);
472         return 0;
473 }
474
475 /* called when the child process has finished writing the record to the
476    database
477 */
478 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
479                              uint16_t flags, void *private_data)
480 {
481         struct childwrite_handle *h = talloc_get_type(private_data, 
482                                                      struct childwrite_handle);
483         void *p = h->private_data;
484         void (*callback)(int, void *) = h->callback;
485         pid_t child = h->child;
486         TALLOC_CTX *tmp_ctx = talloc_new(ev);
487         int ret;
488         char c;
489
490         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
491         CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
492
493         /* the handle needs to go away when the context is gone - when
494            the handle goes away this implicitly closes the pipe, which
495            kills the child */
496         talloc_steal(tmp_ctx, h);
497
498         talloc_set_destructor(h, NULL);
499
500         ret = read(h->fd[0], &c, 1);
501         if (ret < 1) {
502                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
503                 c = 1;
504         }
505
506         callback(c, p);
507
508         kill(child, SIGKILL);
509         talloc_free(tmp_ctx);
510 }
511
512 /* this creates a child process which will take out a tdb transaction
513    and write the record to the database.
514 */
515 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
516                                 void (*callback)(int, void *private_data),
517                                 struct ctdb_persistent_write_state *state)
518 {
519         struct childwrite_handle *result;
520         int ret;
521         pid_t parent = getpid();
522
523         CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
524         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
525
526         if (!(result = talloc_zero(state, struct childwrite_handle))) {
527                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
528                 return NULL;
529         }
530
531         ret = pipe(result->fd);
532
533         if (ret != 0) {
534                 talloc_free(result);
535                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
536                 return NULL;
537         }
538
539         result->child = ctdb_fork(ctdb_db->ctdb);
540
541         if (result->child == (pid_t)-1) {
542                 close(result->fd[0]);
543                 close(result->fd[1]);
544                 talloc_free(result);
545                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
546                 return NULL;
547         }
548
549         result->callback = callback;
550         result->private_data = state;
551         result->ctdb = ctdb_db->ctdb;
552         result->ctdb_db = ctdb_db;
553
554         if (result->child == 0) {
555                 char c = 0;
556
557                 close(result->fd[0]);
558                 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
559                 ret = ctdb_persistent_store(state);
560                 if (ret != 0) {
561                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
562                         c = 1;
563                 }
564
565                 write(result->fd[1], &c, 1);
566
567                 /* make sure we die when our parent dies */
568                 while (kill(parent, 0) == 0 || errno != ESRCH) {
569                         sleep(5);
570                 }
571                 _exit(0);
572         }
573
574         close(result->fd[1]);
575         set_close_on_exec(result->fd[0]);
576
577         talloc_set_destructor(result, childwrite_destructor);
578
579         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
580
581         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
582                                    EVENT_FD_READ, childwrite_handler,
583                                    (void *)result);
584         if (result->fde == NULL) {
585                 talloc_free(result);
586                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
587                 return NULL;
588         }
589         tevent_fd_set_auto_close(result->fde);
590
591         result->start_time = timeval_current();
592
593         return result;
594 }
595
596 /* 
597    update a record on this node if the new record has a higher rsn than the
598    current record
599  */
600 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
601                                    struct ctdb_req_control *c, TDB_DATA recdata, 
602                                    bool *async_reply)
603 {
604         struct ctdb_db_context *ctdb_db;
605         struct ctdb_persistent_write_state *state;
606         struct childwrite_handle *handle;
607         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
608
609         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
610                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
611                 return -1;
612         }
613
614         ctdb_db = find_ctdb_db(ctdb, m->db_id);
615         if (ctdb_db == NULL) {
616                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
617                 return -1;
618         }
619
620         if (ctdb_db->unhealthy_reason) {
621                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
622                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
623                 return -1;
624         }
625
626         state = talloc(ctdb, struct ctdb_persistent_write_state);
627         CTDB_NO_MEMORY(ctdb, state);
628
629         state->ctdb_db = ctdb_db;
630         state->c       = c;
631         state->m       = m;
632
633         /* create a child process to take out a transaction and 
634            write the data.
635         */
636         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
637         if (handle == NULL) {
638                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
639                 talloc_free(state);
640                 return -1;
641         }
642
643         /* we need to wait for the replies */
644         *async_reply = true;
645
646         /* need to keep the control structure around */
647         talloc_steal(state, c);
648
649         /* but we won't wait forever */
650         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
651                         ctdb_persistent_lock_timeout, state);
652
653         return 0;
654 }
655
656
657 /*
658   called when a client has finished a local commit in a transaction to 
659   a persistent database
660  */
661 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
662                                      struct ctdb_req_control *c)
663 {
664         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
665         struct ctdb_db_context *ctdb_db;
666
667         ctdb_db = find_ctdb_db(ctdb, client->db_id);
668         if (ctdb_db == NULL) {
669                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
670                                  "Unknown database 0x%08x\n", client->db_id));
671                 return -1;
672         }
673         if (!ctdb_db->transaction_active) {
674                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
675                                  "Database 0x%08x has no transaction commit "
676                                  "started\n", client->db_id));
677                 return -1;
678         }
679
680         ctdb_db->transaction_active = false;
681         client->db_id = 0;
682
683         if (client->num_persistent_updates == 0) {
684                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
685                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
686                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
687                 return -1;
688         }
689         client->num_persistent_updates--;
690
691         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
692                             "transaction commit db_id[0x%08x]\n",
693                             client->client_id, ctdb_db->db_id));
694
695         return 0;
696 }
697
698 /*
699   called when a client gets an error committing its database
700   during a transaction commit
701  */
702 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
703                                   struct ctdb_req_control *c)
704 {
705         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
706         struct ctdb_db_context *ctdb_db;
707
708         ctdb_db = find_ctdb_db(ctdb, client->db_id);
709         if (ctdb_db == NULL) {
710                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
711                                  "Unknown database 0x%08x\n", client->db_id));
712                 return -1;
713         }
714         if (!ctdb_db->transaction_active) {
715                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
716                                  "Database 0x%08x has no transaction commit "
717                                  "started\n", client->db_id));
718                 return -1;
719         }
720
721         ctdb_db->transaction_active = false;
722         client->db_id = 0;
723
724         if (client->num_persistent_updates == 0) {
725                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
726         } else {
727                 client->num_persistent_updates--;
728         }
729
730         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
731                          " db_id[0x%08x] - forcing recovery\n",
732                          ctdb_db->db_id));
733         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
734
735         return 0;
736 }
737
738 /**
739  * Tell whether a transaction is active on this node on the give DB.
740  */
741 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
742                                    struct ctdb_req_control *c,
743                                    uint32_t db_id)
744 {
745         struct ctdb_db_context *ctdb_db;
746         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
747
748         ctdb_db = find_ctdb_db(ctdb, db_id);
749         if (!ctdb_db) {
750                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
751                 return -1;
752         }
753
754         if (client->db_id == db_id) {
755                 return 0;
756         }
757
758         if (ctdb_db->transaction_active) {
759                 return 1;
760         } else {
761                 return 0;
762         }
763 }
764
765 /*
766   backwards compatibility:
767
768   start a persistent store operation. passing both the key, header and
769   data to the daemon. If the client disconnects before it has issued
770   a persistent_update call to the daemon we trigger a full recovery
771   to ensure the databases are brought back in sync.
772   for now we ignore the recdata that the client has passed to us.
773  */
774 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
775                                       struct ctdb_req_control *c,
776                                       TDB_DATA recdata)
777 {
778         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
779
780         if (client == NULL) {
781                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
782                 return -1;
783         }
784
785         client->num_persistent_updates++;
786
787         return 0;
788 }
789
790 /* 
791   backwards compatibility:
792
793   called to tell ctdbd that it is no longer doing a persistent update 
794 */
795 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
796                                               struct ctdb_req_control *c,
797                                               TDB_DATA recdata)
798 {
799         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
800
801         if (client == NULL) {
802                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
803                 return -1;
804         }
805
806         if (client->num_persistent_updates > 0) {
807                 client->num_persistent_updates--;
808         }
809
810         return 0;
811 }
812
813
814 /*
815   backwards compatibility:
816
817   single record varient of ctdb_control_trans2_commit for older clients
818  */
819 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
820                                       struct ctdb_req_control *c, 
821                                       TDB_DATA recdata, bool *async_reply)
822 {
823         struct ctdb_marshall_buffer *m;
824         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
825         TDB_DATA key, data;
826
827         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
828             rec->keylen + rec->datalen) {
829                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
830                 return -1;
831         }
832
833         key.dptr = &rec->data[0];
834         key.dsize = rec->keylen;
835         data.dptr = &rec->data[rec->keylen];
836         data.dsize = rec->datalen;
837
838         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
839         CTDB_NO_MEMORY(ctdb, m);
840
841         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
842 }
843
844 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
845                                   uint32_t db_id,
846                                   uint64_t *seqnum)
847 {
848         int32_t ret;
849         struct ctdb_db_context *ctdb_db;
850         const char *keyname = CTDB_DB_SEQNUM_KEY;
851         TDB_DATA key;
852         TDB_DATA data;
853         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
854
855         ctdb_db = find_ctdb_db(ctdb, db_id);
856         if (!ctdb_db) {
857                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
858                 ret = -1;
859                 goto done;
860         }
861
862         key.dptr = (uint8_t *)discard_const(keyname);
863         key.dsize = strlen(keyname) + 1;
864
865         ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
866         if (ret != 0) {
867                 goto done;
868         }
869
870         if (data.dsize != sizeof(uint64_t)) {
871                 *seqnum = 0;
872                 goto done;
873         }
874
875         *seqnum = *(uint64_t *)data.dptr;
876
877 done:
878         talloc_free(mem_ctx);
879         return ret;
880 }
881
882 /**
883  * Get the sequence number of a persistent database.
884  */
885 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
886                                    TDB_DATA indata,
887                                    TDB_DATA *outdata)
888 {
889         uint32_t db_id;
890         int32_t ret;
891         uint64_t seqnum;
892
893         db_id = *(uint32_t *)indata.dptr;
894         ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
895         if (ret != 0) {
896                 goto done;
897         }
898
899         outdata->dsize = sizeof(uint64_t);
900         outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
901         if (outdata->dptr == NULL) {
902                 ret = -1;
903                 goto done;
904         }
905
906         *(outdata->dptr) = seqnum;
907
908 done:
909         return ret;
910 }