94c070077ea73f6d1ef67508059c5bb4eafa0f3f
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_freeze.c
1 /* 
2    ctdb freeze handling
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 #include "replace.h"
20 #include "system/network.h"
21 #include "system/filesys.h"
22 #include "system/wait.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26
27 #include "lib/tdb_wrap/tdb_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/util/debug.h"
30
31 #include "ctdb_private.h"
32 #include "ctdb_logging.h"
33
34 #include "common/rb_tree.h"
35 #include "common/common.h"
36
37 /**
38  * Cancel a transaction on database
39  */
40 static int db_transaction_cancel_handler(struct ctdb_db_context *ctdb_db,
41                                          void *private_data)
42 {
43         int ret;
44
45         tdb_add_flags(ctdb_db->ltdb->tdb, TDB_NOLOCK);
46         ret = tdb_transaction_cancel(ctdb_db->ltdb->tdb);
47         if (ret != 0) {
48                 DEBUG(DEBUG_ERR, ("Failed to cancel transaction for db %s\n",
49                                   ctdb_db->db_name));
50         }
51         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_NOLOCK);
52         return 0;
53 }
54
55 /**
56  * Start a transaction on database
57  */
58 static int db_transaction_start_handler(struct ctdb_db_context *ctdb_db,
59                                         void *private_data)
60 {
61         bool freeze_transaction_started = *(bool *)private_data;
62         int ret;
63
64         tdb_add_flags(ctdb_db->ltdb->tdb, TDB_NOLOCK);
65         if (freeze_transaction_started) {
66                 ret = tdb_transaction_cancel(ctdb_db->ltdb->tdb);
67                 if (ret != 0) {
68                         DEBUG(DEBUG_ERR,
69                               ("Failed to cancel transaction for db %s\n",
70                                ctdb_db->db_name));
71                 }
72         }
73         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
74         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_NOLOCK);
75         if (ret != 0) {
76                 DEBUG(DEBUG_ERR, ("Failed to start transaction for db %s\n",
77                                   ctdb_db->db_name));
78                 return -1;
79         }
80         return 0;
81 }
82
83 /**
84  * Commit a transaction on database
85  */
86 static int db_transaction_commit_handler(struct ctdb_db_context *ctdb_db,
87                                          void *private_data)
88 {
89         int healthy_nodes = *(int *)private_data;
90         int ret;
91
92         tdb_add_flags(ctdb_db->ltdb->tdb, TDB_NOLOCK);
93         ret = tdb_transaction_commit(ctdb_db->ltdb->tdb);
94         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_NOLOCK);
95         if (ret != 0) {
96                 DEBUG(DEBUG_ERR, ("Failed to commit transaction for db %s\n",
97                                   ctdb_db->db_name));
98                 return -1;
99         }
100
101         ret = ctdb_update_persistent_health(ctdb_db->ctdb, ctdb_db, NULL,
102                                             healthy_nodes);
103         if (ret != 0) {
104                 DEBUG(DEBUG_ERR, ("Failed to update persistent health for db %s\n",
105                                   ctdb_db->db_name));
106         }
107         return ret;
108 }
109
110 /* a list of control requests waiting for db freeze */
111 struct ctdb_db_freeze_waiter {
112         struct ctdb_db_freeze_waiter *next, *prev;
113         struct ctdb_context *ctdb;
114         void *private_data;
115         int32_t status;
116 };
117
118 /* a handle to a db freeze lock child process */
119 struct ctdb_db_freeze_handle {
120         struct ctdb_db_context *ctdb_db;
121         struct lock_request *lreq;
122         struct ctdb_db_freeze_waiter *waiters;
123 };
124
125 /**
126  * Called when freeing database freeze handle
127  */
128 static int ctdb_db_freeze_handle_destructor(struct ctdb_db_freeze_handle *h)
129 {
130         struct ctdb_db_context *ctdb_db = h->ctdb_db;
131
132         DEBUG(DEBUG_ERR, ("Release freeze handle for db %s\n",
133                           ctdb_db->db_name));
134
135         /* Cancel any pending transactions */
136         if (ctdb_db->freeze_transaction_started) {
137                 db_transaction_cancel_handler(ctdb_db, NULL);
138                 ctdb_db->freeze_transaction_started = false;
139         }
140         ctdb_db->freeze_mode = CTDB_FREEZE_NONE;
141         ctdb_db->freeze_handle = NULL;
142
143         talloc_free(h->lreq);
144         return 0;
145 }
146
147 /**
148  * Called when a database is frozen
149  */
150 static void ctdb_db_freeze_handler(void *private_data, bool locked)
151 {
152         struct ctdb_db_freeze_handle *h = talloc_get_type_abort(
153                 private_data, struct ctdb_db_freeze_handle);
154         struct ctdb_db_freeze_waiter *w;
155
156         if (h->ctdb_db->freeze_mode == CTDB_FREEZE_FROZEN) {
157                 DEBUG(DEBUG_ERR, ("Freeze db child died - unfreezing\n"));
158                 h->ctdb_db->freeze_mode = CTDB_FREEZE_NONE;
159                 talloc_free(h);
160                 return;
161         }
162
163         if (!locked) {
164                 DEBUG(DEBUG_ERR, ("Failed to get db lock for %s\n",
165                                   h->ctdb_db->db_name));
166                 h->ctdb_db->freeze_mode = CTDB_FREEZE_NONE;
167                 talloc_free(h);
168                 return;
169         }
170
171         h->ctdb_db->freeze_mode = CTDB_FREEZE_FROZEN;
172
173         /* notify the waiters */
174         while ((w = h->waiters) != NULL) {
175                 w->status = 0;
176                 DLIST_REMOVE(h->waiters, w);
177                 talloc_free(w);
178         }
179 }
180
181 /**
182  * Start freeze process for a database
183  */
184 static void ctdb_start_db_freeze(struct ctdb_db_context *ctdb_db)
185 {
186         struct ctdb_db_freeze_handle *h;
187
188         if (ctdb_db->freeze_mode == CTDB_FREEZE_FROZEN) {
189                 return;
190         }
191
192         if (ctdb_db->freeze_handle != NULL) {
193                 return;
194         }
195
196         DEBUG(DEBUG_ERR, ("Freeze db: %s\n", ctdb_db->db_name));
197
198         ctdb_stop_vacuuming(ctdb_db->ctdb);
199
200         h = talloc_zero(ctdb_db, struct ctdb_db_freeze_handle);
201         CTDB_NO_MEMORY_FATAL(ctdb_db->ctdb, h);
202
203         h->ctdb_db = ctdb_db;
204         h->lreq = ctdb_lock_db(h, ctdb_db, false, ctdb_db_freeze_handler, h);
205         CTDB_NO_MEMORY_FATAL(ctdb_db->ctdb, h->lreq);
206         talloc_set_destructor(h, ctdb_db_freeze_handle_destructor);
207
208         ctdb_db->freeze_handle = h;
209         ctdb_db->freeze_mode = CTDB_FREEZE_PENDING;
210 }
211
212 /**
213  * Reply to a waiter for db freeze
214  */
215 static int ctdb_db_freeze_waiter_destructor(struct ctdb_db_freeze_waiter *w)
216 {
217         /* 'c' pointer is talloc_memdup(), so cannot use talloc_get_type */
218         struct ctdb_req_control_old *c =
219                 (struct ctdb_req_control_old *)w->private_data;
220
221         ctdb_request_control_reply(w->ctdb, c, NULL, w->status, NULL);
222         return 0;
223 }
224
225 /**
226  * freeze a database
227  */
228 int32_t ctdb_control_db_freeze(struct ctdb_context *ctdb,
229                                struct ctdb_req_control_old *c,
230                                uint32_t db_id,
231                                bool *async_reply)
232 {
233         struct ctdb_db_context *ctdb_db;
234         struct ctdb_db_freeze_waiter *w;
235
236         ctdb_db = find_ctdb_db(ctdb, db_id);
237         if (ctdb_db == NULL) {
238                 DEBUG(DEBUG_ERR, ("Freeze db for unknown dbid 0x%08x\n", db_id));
239                 return -1;
240         }
241
242         if (ctdb_db->freeze_mode == CTDB_FREEZE_FROZEN) {
243                 DEBUG(DEBUG_ERR, ("Freeze db: %s frozen\n", ctdb_db->db_name));
244                 return 0;
245         }
246
247         ctdb_start_db_freeze(ctdb_db);
248
249         /* add ourselves to the list of waiters */
250         w = talloc(ctdb_db->freeze_handle, struct ctdb_db_freeze_waiter);
251         CTDB_NO_MEMORY(ctdb, w);
252         w->ctdb = ctdb;
253         w->private_data = talloc_steal(w, c);
254         w->status = -1;
255         talloc_set_destructor(w, ctdb_db_freeze_waiter_destructor);
256         DLIST_ADD(ctdb_db->freeze_handle->waiters, w);
257
258         *async_reply = true;
259         return 0;
260 }
261
262 /**
263  * Thaw a database
264  */
265 int32_t ctdb_control_db_thaw(struct ctdb_context *ctdb, uint32_t db_id)
266 {
267         struct ctdb_db_context *ctdb_db;
268
269         ctdb_db = find_ctdb_db(ctdb, db_id);
270         if (ctdb_db == NULL) {
271                 DEBUG(DEBUG_ERR, ("Thaw db for unknown dbid 0x%08x\n", db_id));
272                 return -1;
273         }
274
275         DEBUG(DEBUG_ERR, ("Thaw db: %s generation %u\n", ctdb_db->db_name,
276                           ctdb_db->generation));
277
278         TALLOC_FREE(ctdb_db->freeze_handle);
279         ctdb_call_resend_db(ctdb_db);
280         return 0;
281 }
282
283
284 /*
285   a list of control requests waiting for a freeze lock child to get
286   the database locks
287  */
288 struct ctdb_freeze_waiter {
289         struct ctdb_freeze_waiter *next, *prev;
290         struct ctdb_context *ctdb;
291         struct ctdb_req_control_old *c;
292         uint32_t priority;
293         int32_t status;
294 };
295
296 /* a handle to a freeze lock child process */
297 struct ctdb_freeze_handle {
298         struct ctdb_context *ctdb;
299         uint32_t priority;
300         unsigned int num_total, num_locked, num_failed;
301         struct ctdb_freeze_waiter *waiters;
302 };
303
304 static int db_thaw(struct ctdb_db_context *ctdb_db, void *private_data)
305 {
306         talloc_free(ctdb_db->freeze_handle);
307         return 0;
308 }
309
310 /*
311   destroy a freeze handle
312  */
313 static int ctdb_freeze_handle_destructor(struct ctdb_freeze_handle *h)
314 {
315         struct ctdb_context *ctdb = h->ctdb;
316
317         DEBUG(DEBUG_ERR,("Release freeze handle for prio %u\n", h->priority));
318
319         /* cancel any pending transactions */
320         if (ctdb->freeze_transaction_started) {
321                 ctdb_db_prio_iterator(ctdb, h->priority,
322                                       db_transaction_cancel_handler, NULL);
323                 ctdb->freeze_transaction_started = false;
324         }
325
326         ctdb_db_prio_iterator(ctdb, h->priority, db_thaw, NULL);
327
328         ctdb->freeze_mode[h->priority]    = CTDB_FREEZE_NONE;
329         ctdb->freeze_handles[h->priority] = NULL;
330
331         return 0;
332 }
333
334 /*
335   called when the child writes its status to us
336  */
337 static void ctdb_freeze_lock_handler(void *private_data, bool locked)
338 {
339         struct ctdb_freeze_handle *h = talloc_get_type_abort(private_data,
340                                                              struct ctdb_freeze_handle);
341         struct ctdb_freeze_waiter *w;
342
343         if (h->ctdb->freeze_mode[h->priority] == CTDB_FREEZE_FROZEN) {
344                 DEBUG(DEBUG_INFO,("freeze child died - unfreezing\n"));
345                 talloc_free(h);
346                 return;
347         }
348
349         if (!locked) {
350                 DEBUG(DEBUG_ERR,("Failed to get locks in ctdb_freeze_child\n"));
351                 /* we didn't get the locks - destroy the handle */
352                 talloc_free(h);
353                 return;
354         }
355
356         h->ctdb->freeze_mode[h->priority] = CTDB_FREEZE_FROZEN;
357
358         /* notify the waiters */
359         if (h != h->ctdb->freeze_handles[h->priority]) {
360                 DEBUG(DEBUG_ERR,("lockwait finished but h is not linked\n"));
361         }
362         while ((w = h->waiters)) {
363                 w->status = 0;
364                 DLIST_REMOVE(h->waiters, w);
365                 talloc_free(w);
366         }
367 }
368
369 /**
370  * When single database is frozen
371  */
372 static int db_freeze_waiter_destructor(struct ctdb_db_freeze_waiter *w)
373 {
374         struct ctdb_freeze_handle *h = talloc_get_type_abort(
375                 w->private_data, struct ctdb_freeze_handle);
376
377         if (w->status == 0) {
378                 h->num_locked += 1;
379         } else {
380                 h->num_failed += 1;
381         }
382
383         /* Call ctdb_freeze_lock_handler() only when the status of all
384          * databases is known.
385          */
386         if (h->num_locked + h->num_failed == h->num_total) {
387                 bool locked;
388
389                 if (h->num_locked == h->num_total) {
390                         locked = true;
391                 } else {
392                         locked = false;
393                 }
394                 ctdb_freeze_lock_handler(h, locked);
395         }
396         return 0;
397 }
398
399 /**
400  * Count the number of databases
401  */
402 static int db_count(struct ctdb_db_context *ctdb_db, void *private_data)
403 {
404         int *count = (int *)private_data;
405
406         *count += 1;
407
408         return 0;
409 }
410
411 /**
412  * Freeze a single database
413  */
414 static int db_freeze(struct ctdb_db_context *ctdb_db, void *private_data)
415 {
416         struct ctdb_freeze_handle *h = talloc_get_type_abort(
417                 private_data, struct ctdb_freeze_handle);
418         struct ctdb_db_freeze_waiter *w;
419
420         ctdb_start_db_freeze(ctdb_db);
421
422         w = talloc(ctdb_db->freeze_handle, struct ctdb_db_freeze_waiter);
423         CTDB_NO_MEMORY(h->ctdb, w);
424         w->ctdb = h->ctdb;
425         w->private_data = h;
426         w->status = -1;
427         talloc_set_destructor(w, db_freeze_waiter_destructor);
428
429         if (ctdb_db->freeze_mode == CTDB_FREEZE_FROZEN) {
430                 /* Early return if already frozen */
431                 w->status = 0;
432                 talloc_free(w);
433                 return 0;
434         }
435
436         DLIST_ADD(ctdb_db->freeze_handle->waiters, w);
437
438         return 0;
439 }
440
441 /*
442   start the freeze process for a certain priority
443  */
444 static void ctdb_start_freeze(struct ctdb_context *ctdb, uint32_t priority)
445 {
446         struct ctdb_freeze_handle *h;
447         int ret;
448
449         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
450                 DEBUG(DEBUG_ERR,(__location__ " Invalid db priority : %u\n", priority));
451                 ctdb_fatal(ctdb, "Internal error");
452         }
453
454         if (ctdb->freeze_mode[priority] == CTDB_FREEZE_FROZEN) {
455                 int count = 0;
456
457                 /*
458                  * Check if all the databases are frozen
459                  *
460                  * It's possible that the databases can get attached after
461                  * initial freeze. This typically happens during startup as
462                  * CTDB will only attach persistent databases and go in to
463                  * startup freeze.  The recovery master during recovery will
464                  * attach all the missing databases.
465                  */
466
467                 h = ctdb->freeze_handles[priority];
468                 if (h == NULL) {
469                         ctdb->freeze_mode[priority] = CTDB_FREEZE_NONE;
470                         return;
471                 }
472
473                 ret = ctdb_db_prio_iterator(ctdb, priority, db_count, &count);
474                 if (ret != 0) {
475                         TALLOC_FREE(ctdb->freeze_handles[priority]);
476                         ctdb->freeze_mode[priority] = CTDB_FREEZE_NONE;
477                         return;
478                 }
479
480                 if (count != h->num_total) {
481                         DEBUG(DEBUG_ERR, ("Freeze priority %u: incremental\n",
482                                           priority));
483
484                         h->num_total = count;
485                         h->num_locked = 0;
486                         h->num_failed = 0;
487
488                         ctdb->freeze_mode[priority] = CTDB_FREEZE_PENDING;
489
490                         ret = ctdb_db_prio_iterator(ctdb, priority,
491                                                     db_freeze, h);
492                         if (ret != 0) {
493                                 TALLOC_FREE(ctdb->freeze_handles[priority]);
494                                 ctdb->freeze_mode[priority] = CTDB_FREEZE_NONE;
495                         }
496                 }
497                 return;
498         }
499
500         if (ctdb->freeze_handles[priority] != NULL) {
501                 /* already trying to freeze */
502                 return;
503         }
504
505         DEBUG(DEBUG_ERR, ("Freeze priority %u\n", priority));
506
507         /* Stop any vacuuming going on: we don't want to wait. */
508         ctdb_stop_vacuuming(ctdb);
509
510         /* create freeze lock children for each database */
511         h = talloc_zero(ctdb, struct ctdb_freeze_handle);
512         CTDB_NO_MEMORY_FATAL(ctdb, h);
513         h->ctdb = ctdb;
514         h->priority = priority;
515         talloc_set_destructor(h, ctdb_freeze_handle_destructor);
516         ctdb->freeze_handles[priority] = h;
517
518         ret = ctdb_db_prio_iterator(ctdb, priority, db_count, &h->num_total);
519         if (ret != 0) {
520                 talloc_free(h);
521                 return;
522         }
523
524         ctdb->freeze_mode[priority] = CTDB_FREEZE_PENDING;
525
526         ret = ctdb_db_prio_iterator(ctdb, priority, db_freeze, h);
527         if (ret != 0) {
528                 talloc_free(h);
529                 return;
530         }
531
532         if (h->num_total == 0) {
533                 ctdb->freeze_mode[priority] = CTDB_FREEZE_FROZEN;
534         }
535 }
536
537 /*
538   destroy a waiter for a freeze mode change
539  */
540 static int ctdb_freeze_waiter_destructor(struct ctdb_freeze_waiter *w)
541 {
542         ctdb_request_control_reply(w->ctdb, w->c, NULL, w->status, NULL);
543         return 0;
544 }
545
546 /*
547   freeze the databases
548  */
549 int32_t ctdb_control_freeze(struct ctdb_context *ctdb, struct ctdb_req_control_old *c, bool *async_reply)
550 {
551         struct ctdb_freeze_waiter *w;
552         uint32_t priority;
553
554         priority = (uint32_t)c->srvid;
555
556         if (priority == 0) {
557                 DEBUG(DEBUG_ERR,("Freeze priority 0 requested, remapping to priority 1\n"));
558                 priority = 1;
559         }
560
561         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
562                 DEBUG(DEBUG_ERR,(__location__ " Invalid db priority : %u\n", priority));
563                 return -1;
564         }
565
566         ctdb_start_freeze(ctdb, priority);
567
568         if (ctdb->freeze_mode[priority] == CTDB_FREEZE_FROZEN) {
569                 DEBUG(DEBUG_ERR, ("Freeze priority %u: frozen\n", priority));
570                 /* we're already frozen */
571                 return 0;
572         }
573
574         if (ctdb->freeze_handles[priority] == NULL) {
575                 DEBUG(DEBUG_ERR,("No freeze lock handle when adding a waiter\n"));
576                 return -1;
577         }
578
579         /* If there are no databases, we are done. */
580         if (ctdb->freeze_handles[priority]->num_total == 0) {
581                 return 0;
582         }
583
584         /* add ourselves to list of waiters */
585         w = talloc(ctdb->freeze_handles[priority], struct ctdb_freeze_waiter);
586         CTDB_NO_MEMORY(ctdb, w);
587         w->ctdb     = ctdb;
588         w->c        = talloc_steal(w, c);
589         w->priority = priority;
590         w->status   = -1;
591         talloc_set_destructor(w, ctdb_freeze_waiter_destructor);
592         DLIST_ADD(ctdb->freeze_handles[priority]->waiters, w);
593
594         /* we won't reply till later */
595         *async_reply = true;
596         return 0;
597 }
598
599
600 static int db_freeze_block(struct ctdb_db_context *ctdb_db, void *private_data)
601 {
602         struct tevent_context *ev = (struct tevent_context *)private_data;
603
604         ctdb_start_db_freeze(ctdb_db);
605
606         while (ctdb_db->freeze_mode == CTDB_FREEZE_PENDING) {
607                 tevent_loop_once(ev);
608         }
609
610         if (ctdb_db->freeze_mode != CTDB_FREEZE_FROZEN) {
611                 return -1;
612         }
613
614         return 0;
615 }
616
617 /*
618   block until we are frozen, used during daemon startup
619  */
620 bool ctdb_blocking_freeze(struct ctdb_context *ctdb)
621 {
622         int ret;
623
624         ret = ctdb_db_iterator(ctdb, db_freeze_block, ctdb->ev);
625         if (ret != 0) {
626                 return false;
627         }
628
629         return true;
630 }
631
632
633 static void thaw_priority(struct ctdb_context *ctdb, uint32_t priority)
634 {
635         DEBUG(DEBUG_ERR,("Thawing priority %u\n", priority));
636
637         /* cancel any pending transactions */
638         if (ctdb->freeze_transaction_started) {
639                 ctdb_db_prio_iterator(ctdb, priority,
640                                       db_transaction_cancel_handler, NULL);
641                 ctdb->freeze_transaction_started = false;
642         }
643
644         ctdb_db_prio_iterator(ctdb, priority, db_thaw, NULL);
645         TALLOC_FREE(ctdb->freeze_handles[priority]);
646 }
647
648 /*
649   thaw the databases
650  */
651 int32_t ctdb_control_thaw(struct ctdb_context *ctdb, uint32_t priority,
652                           bool check_recmode)
653 {
654         if (priority > NUM_DB_PRIORITIES) {
655                 DEBUG(DEBUG_ERR,(__location__ " Invalid db priority : %u\n",
656                                  priority));
657                 return -1;
658         }
659
660         if (check_recmode && ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
661                 DEBUG(DEBUG_ERR, ("Failing to thaw databases while "
662                                   "recovery is active\n"));
663                 return -1;
664         }
665
666         if (priority == 0) {
667                 int i;
668                 for (i=1;i<=NUM_DB_PRIORITIES; i++) {
669                         thaw_priority(ctdb, i);
670                 }
671         } else {
672                 thaw_priority(ctdb, priority);
673         }
674
675         ctdb_call_resend_all(ctdb);
676         return 0;
677 }
678
679 /**
680  * Database transaction wrappers
681  *
682  * These functions are wrappers around transaction start/cancel/commit handlers.
683  */
684
685 struct db_start_transaction_state {
686         uint32_t transaction_id;
687         bool transaction_started;
688 };
689
690 static int db_start_transaction(struct ctdb_db_context *ctdb_db,
691                                 void *private_data)
692 {
693         struct db_start_transaction_state *state =
694                 (struct db_start_transaction_state *)private_data;
695         int ret;
696         bool transaction_started;
697
698         if (ctdb_db->freeze_mode != CTDB_FREEZE_FROZEN) {
699                 DEBUG(DEBUG_ERR,
700                       ("Database %s not frozen, cannot start transaction\n",
701                        ctdb_db->db_name));
702                 return -1;
703         }
704
705         transaction_started = state->transaction_started &
706                               ctdb_db->freeze_transaction_started;
707
708         ret = db_transaction_start_handler(ctdb_db,
709                                            &transaction_started);
710         if (ret != 0) {
711                 return -1;
712         }
713
714         ctdb_db->freeze_transaction_started = true;
715         ctdb_db->freeze_transaction_id = state->transaction_id;
716
717         return 0;
718 }
719
720 static int db_cancel_transaction(struct ctdb_db_context *ctdb_db,
721                                  void *private_data)
722 {
723         int ret;
724
725         ret = db_transaction_cancel_handler(ctdb_db, private_data);
726         if (ret != 0) {
727                 return ret;
728         }
729
730         ctdb_db->freeze_transaction_started = false;
731
732         return 0;
733 }
734
735 struct db_commit_transaction_state {
736         uint32_t transaction_id;
737         int healthy_nodes;
738 };
739
740 static int db_commit_transaction(struct ctdb_db_context *ctdb_db,
741                                  void *private_data)
742 {
743         struct db_commit_transaction_state *state =
744                 (struct db_commit_transaction_state *)private_data;
745         int ret;
746
747         if (ctdb_db->freeze_mode != CTDB_FREEZE_FROZEN) {
748                 DEBUG(DEBUG_ERR,
749                       ("Database %s not frozen, cannot commit transaction\n",
750                        ctdb_db->db_name));
751                 return -1;
752         }
753
754         if (!ctdb_db->freeze_transaction_started) {
755                 DEBUG(DEBUG_ERR, ("Transaction not started on %s\n",
756                                   ctdb_db->db_name));
757                 return -1;
758         }
759
760         if (ctdb_db->freeze_transaction_id != state->transaction_id) {
761                 DEBUG(DEBUG_ERR,
762                       ("Incorrect transaction commit id 0x%08x for %s\n",
763                        state->transaction_id, ctdb_db->db_name));
764                 return -1;
765         }
766
767         ret = db_transaction_commit_handler(ctdb_db, &state->healthy_nodes);
768         if (ret != 0) {
769                 return -1;
770         }
771
772         ctdb_db->freeze_transaction_started = false;
773         ctdb_db->freeze_transaction_id = 0;
774         ctdb_db->generation = state->transaction_id;
775         return 0;
776 }
777
778 /**
779  * Start a transaction on a database - used for db recovery
780  */
781 int32_t ctdb_control_db_transaction_start(struct ctdb_context *ctdb,
782                                           TDB_DATA indata)
783 {
784         struct ctdb_control_transdb *w =
785                 (struct ctdb_control_transdb *)indata.dptr;
786         struct ctdb_db_context *ctdb_db;
787         struct db_start_transaction_state state;
788
789         ctdb_db = find_ctdb_db(ctdb, w->db_id);
790         if (ctdb_db == NULL) {
791                 DEBUG(DEBUG_ERR,
792                       ("Transaction start for unknown dbid 0x%08x\n",
793                        w->db_id));
794                 return -1;
795         }
796
797         state.transaction_id = w->transaction_id;
798         state.transaction_started = true;
799
800         return db_start_transaction(ctdb_db, &state);
801 }
802
803 /**
804  * Cancel a transaction on a database - used for db recovery
805  */
806 int32_t ctdb_control_db_transaction_cancel(struct ctdb_context *ctdb,
807                                            TDB_DATA indata)
808 {
809         uint32_t db_id = *(uint32_t *)indata.dptr;
810         struct ctdb_db_context *ctdb_db;
811
812         ctdb_db = find_ctdb_db(ctdb, db_id);
813         if (ctdb_db == NULL) {
814                 DEBUG(DEBUG_ERR,
815                       ("Transaction cancel for unknown dbid 0x%08x\n", db_id));
816                 return -1;
817         }
818
819         DEBUG(DEBUG_ERR, ("Recovery db transaction cancelled for %s\n",
820                           ctdb_db->db_name));
821
822         return db_cancel_transaction(ctdb_db, NULL);
823 }
824
825 /**
826  * Commit a transaction on a database - used for db recovery
827  */
828 int32_t ctdb_control_db_transaction_commit(struct ctdb_context *ctdb,
829                                            TDB_DATA indata)
830 {
831         struct ctdb_control_transdb *w =
832                 (struct ctdb_control_transdb *)indata.dptr;
833         struct ctdb_db_context *ctdb_db;
834         struct db_commit_transaction_state state;
835         int healthy_nodes, i;
836
837         ctdb_db = find_ctdb_db(ctdb, w->db_id);
838         if (ctdb_db == NULL) {
839                 DEBUG(DEBUG_ERR,
840                       ("Transaction commit for unknown dbid 0x%08x\n",
841                        w->db_id));
842                 return -1;
843         }
844
845         healthy_nodes = 0;
846         for (i=0; i < ctdb->num_nodes; i++) {
847                 if (ctdb->nodes[i]->flags == 0) {
848                         healthy_nodes += 1;
849                 }
850         }
851
852         state.transaction_id = w->transaction_id;
853         state.healthy_nodes = healthy_nodes;
854
855         return db_commit_transaction(ctdb_db, &state);
856 }
857
858 /*
859   start a transaction on all databases - used for recovery
860  */
861 int32_t ctdb_control_transaction_start(struct ctdb_context *ctdb, uint32_t id)
862 {
863         struct db_start_transaction_state state;
864         int ret;
865
866         if (!ctdb_db_all_frozen(ctdb)) {
867                 DEBUG(DEBUG_ERR, (__location__
868                       " failing transaction start while not frozen\n"));
869                 return -1;
870         }
871
872         state.transaction_id = id;
873         state.transaction_started = ctdb->freeze_transaction_started;
874
875         ret = ctdb_db_iterator(ctdb, db_start_transaction, &state);
876         if (ret != 0) {
877                 return -1;
878         }
879
880         ctdb->freeze_transaction_started = true;
881         ctdb->freeze_transaction_id = id;
882
883         return 0;
884 }
885
886 /*
887   cancel a transaction for all databases - used for recovery
888  */
889 int32_t ctdb_control_transaction_cancel(struct ctdb_context *ctdb)
890 {
891         DEBUG(DEBUG_ERR,(__location__ " recovery transaction cancelled called\n"));
892
893         ctdb_db_iterator(ctdb, db_cancel_transaction, NULL);
894
895         ctdb->freeze_transaction_started = false;
896
897         return 0;
898 }
899
900 /*
901   commit transactions on all databases
902  */
903 int32_t ctdb_control_transaction_commit(struct ctdb_context *ctdb, uint32_t id)
904 {
905         struct db_commit_transaction_state state;
906         int i;
907         int healthy_nodes = 0;
908         int ret;
909
910         if (!ctdb_db_all_frozen(ctdb)) {
911                 DEBUG(DEBUG_ERR, (__location__
912                       " failing transaction commit while not frozen\n"));
913                 return -1;
914         }
915
916         if (!ctdb->freeze_transaction_started) {
917                 DEBUG(DEBUG_ERR,(__location__ " transaction not started\n"));
918                 return -1;
919         }
920
921         if (id != ctdb->freeze_transaction_id) {
922                 DEBUG(DEBUG_ERR,(__location__ " incorrect transaction id 0x%x in commit\n", id));
923                 return -1;
924         }
925
926         DEBUG(DEBUG_DEBUG,(__location__ " num_nodes[%d]\n", ctdb->num_nodes));
927         for (i=0; i < ctdb->num_nodes; i++) {
928                 DEBUG(DEBUG_DEBUG,(__location__ " node[%d].flags[0x%X]\n",
929                                    i, ctdb->nodes[i]->flags));
930                 if (ctdb->nodes[i]->flags == 0) {
931                         healthy_nodes++;
932                 }
933         }
934         DEBUG(DEBUG_INFO,(__location__ " healthy_nodes[%d]\n", healthy_nodes));
935
936         state.transaction_id = id;
937         state.healthy_nodes = healthy_nodes;
938
939         ret = ctdb_db_iterator(ctdb, db_commit_transaction, &state);
940         if (ret != 0) {
941                 DEBUG(DEBUG_ERR, ("Cancel all transactions\n"));
942                 goto fail;
943         }
944
945         ctdb->freeze_transaction_started = false;
946         ctdb->freeze_transaction_id = 0;
947
948         return 0;
949
950 fail:
951         /* cancel any pending transactions */
952         ctdb_db_iterator(ctdb, db_cancel_transaction, NULL);
953         ctdb->freeze_transaction_started = false;
954
955         return -1;
956 }
957
958 /*
959   wipe a database - only possible when in a frozen transaction
960  */
961 int32_t ctdb_control_wipe_database(struct ctdb_context *ctdb, TDB_DATA indata)
962 {
963         struct ctdb_control_transdb w = *(struct ctdb_control_transdb *)indata.dptr;
964         struct ctdb_db_context *ctdb_db;
965
966         ctdb_db = find_ctdb_db(ctdb, w.db_id);
967         if (!ctdb_db) {
968                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", w.db_id));
969                 return -1;
970         }
971
972         if (ctdb_db->freeze_mode != CTDB_FREEZE_FROZEN) {
973                 DEBUG(DEBUG_ERR,(__location__ " Failed transaction_start while not frozen\n"));
974                 return -1;
975         }
976
977         if (!ctdb_db->freeze_transaction_started) {
978                 DEBUG(DEBUG_ERR,(__location__ " transaction not started\n"));
979                 return -1;
980         }
981
982         if (w.transaction_id != ctdb_db->freeze_transaction_id) {
983                 DEBUG(DEBUG_ERR,(__location__ " incorrect transaction id 0x%x in commit\n", w.transaction_id));
984                 return -1;
985         }
986
987         if (tdb_wipe_all(ctdb_db->ltdb->tdb) != 0) {
988                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database for db '%s'\n",
989                          ctdb_db->db_name));
990                 return -1;
991         }
992
993         if (!ctdb_db->persistent) {
994                 talloc_free(ctdb_db->delete_queue);
995                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
996                 if (ctdb_db->delete_queue == NULL) {
997                         DEBUG(DEBUG_ERR, (__location__ " Failed to re-create "
998                                           "the vacuum tree.\n"));
999                         return -1;
1000                 }
1001         }
1002
1003         return 0;
1004 }
1005
1006 bool ctdb_db_frozen(struct ctdb_db_context *ctdb_db)
1007 {
1008         if (ctdb_db->freeze_mode != CTDB_FREEZE_FROZEN) {
1009                 return false;
1010         }
1011
1012         return true;
1013 }
1014
1015 bool ctdb_db_prio_frozen(struct ctdb_context *ctdb, uint32_t priority)
1016 {
1017         if (priority == 0) {
1018                 priority = 1;
1019         }
1020         if (priority > NUM_DB_PRIORITIES) {
1021                 DEBUG(DEBUG_ERR, ("Invalid DB priority specified\n"));
1022                 return false;
1023         }
1024
1025         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
1026                 return false;
1027         }
1028
1029         return true;
1030 }
1031
1032 bool ctdb_db_all_frozen(struct ctdb_context *ctdb)
1033 {
1034         int i;
1035
1036         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1037                 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
1038                         return false;
1039                 }
1040         }
1041         return true;
1042 }