ctdb-daemon: Store db_flags instead of individual boolean flags
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
45
46 /**
47  * write a record to a normal database
48  *
49  * This is the server-variant of the ctdb_ltdb_store function.
50  * It contains logic to determine whether a record should be
51  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52  * controls to the local ctdb daemon if apporpriate.
53  */
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55                                   TDB_DATA key,
56                                   struct ctdb_ltdb_header *header,
57                                   TDB_DATA data)
58 {
59         struct ctdb_context *ctdb = ctdb_db->ctdb;
60         TDB_DATA rec[2];
61         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
62         int ret;
63         bool seqnum_suppressed = false;
64         bool keep = false;
65         bool schedule_for_deletion = false;
66         bool remove_from_delete_queue = false;
67         uint32_t lmaster;
68
69         if (ctdb->flags & CTDB_FLAG_TORTURE) {
70                 TDB_DATA old;
71                 struct ctdb_ltdb_header *h2;
72
73                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74                 h2 = (struct ctdb_ltdb_header *)old.dptr;
75                 if (old.dptr != NULL &&
76                     old.dsize >= hsize &&
77                     h2->rsn > header->rsn) {
78                         DEBUG(DEBUG_ERR,
79                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
80                                h2->rsn, header->rsn));
81                 }
82                 if (old.dptr) {
83                         free(old.dptr);
84                 }
85         }
86
87         if (ctdb->vnn_map == NULL) {
88                 /*
89                  * Called from a client: always store the record
90                  * Also don't call ctdb_lmaster since it uses the vnn_map!
91                  */
92                 keep = true;
93                 goto store;
94         }
95
96         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
97
98         /*
99          * If we migrate an empty record off to another node
100          * and the record has not been migrated with data,
101          * delete the record instead of storing the empty record.
102          */
103         if (data.dsize != 0) {
104                 keep = true;
105         } else if (header->flags & CTDB_REC_RO_FLAGS) {
106                 keep = true;
107         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
108                 /*
109                  * The record is not created by the client but
110                  * automatically by the ctdb_ltdb_fetch logic that
111                  * creates a record with an initial header in the
112                  * ltdb before trying to migrate the record from
113                  * the current lmaster. Keep it instead of trying
114                  * to delete the non-existing record...
115                  */
116                 keep = true;
117                 schedule_for_deletion = true;
118         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
119                 keep = true;
120         } else if (ctdb_db->ctdb->pnn == lmaster) {
121                 /*
122                  * If we are lmaster, then we usually keep the record.
123                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
124                  * and the record is empty and has never been migrated
125                  * with data, then we should delete it instead of storing it.
126                  * This is part of the vacuuming process.
127                  *
128                  * The reason that we usually need to store even empty records
129                  * on the lmaster is that a client operating directly on the
130                  * lmaster (== dmaster) expects the local copy of the record to
131                  * exist after successful ctdb migrate call. If the record does
132                  * not exist, the client goes into a migrate loop and eventually
133                  * fails. So storing the empty record makes sure that we do not
134                  * need to change the client code.
135                  */
136                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
137                         keep = true;
138                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
139                         keep = true;
140                 }
141         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
142                 keep = true;
143         }
144
145         if (keep) {
146                 if (ctdb_db_volatile(ctdb_db) &&
147                     (ctdb_db->ctdb->pnn == header->dmaster) &&
148                     !(header->flags & CTDB_REC_RO_FLAGS))
149                 {
150                         header->rsn++;
151
152                         if (data.dsize == 0) {
153                                 schedule_for_deletion = true;
154                         }
155                 }
156                 remove_from_delete_queue = !schedule_for_deletion;
157         }
158
159 store:
160         /*
161          * The VACUUM_MIGRATED flag is only set temporarily for
162          * the above logic when the record was retrieved by a
163          * VACUUM_MIGRATE call and should not be stored in the
164          * database.
165          *
166          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
167          * and there are two cases in which the corresponding record
168          * is stored in the local database:
169          * 1. The record has been migrated with data in the past
170          *    (the MIGRATED_WITH_DATA record flag is set).
171          * 2. The record has been filled with data again since it
172          *    had been submitted in the VACUUM_FETCH message to the
173          *    lmaster.
174          * For such records it is important to not store the
175          * VACUUM_MIGRATED flag in the database.
176          */
177         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
178
179         /*
180          * Similarly, clear the AUTOMATIC flag which should not enter
181          * the local database copy since this would require client
182          * modifications to clear the flag when the client stores
183          * the record.
184          */
185         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
186
187         rec[0].dsize = hsize;
188         rec[0].dptr = (uint8_t *)header;
189
190         rec[1].dsize = data.dsize;
191         rec[1].dptr = data.dptr;
192
193         /* Databases with seqnum updates enabled only get their seqnum
194            changes when/if we modify the data */
195         if (ctdb_db->seqnum_update != NULL) {
196                 TDB_DATA old;
197                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
198
199                 if ((old.dsize == hsize + data.dsize) &&
200                     memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
201                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
202                         seqnum_suppressed = true;
203                 }
204                 if (old.dptr != NULL) {
205                         free(old.dptr);
206                 }
207         }
208
209         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
210                             ctdb_db->db_name,
211                             keep?"storing":"deleting",
212                             ctdb_hash(&key)));
213
214         if (keep) {
215                 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
216         } else {
217                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
218         }
219
220         if (ret != 0) {
221                 int lvl = DEBUG_ERR;
222
223                 if (keep == false &&
224                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
225                 {
226                         lvl = DEBUG_DEBUG;
227                 }
228
229                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
230                             "%d - %s\n",
231                             ctdb_db->db_name,
232                             keep?"store":"delete", ret,
233                             tdb_errorstr(ctdb_db->ltdb->tdb)));
234
235                 schedule_for_deletion = false;
236                 remove_from_delete_queue = false;
237         }
238         if (seqnum_suppressed) {
239                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
240         }
241
242         if (schedule_for_deletion) {
243                 int ret2;
244                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
245                 if (ret2 != 0) {
246                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
247                 }
248         }
249
250         if (remove_from_delete_queue) {
251                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
252         }
253
254         return ret;
255 }
256
257 struct lock_fetch_state {
258         struct ctdb_context *ctdb;
259         struct ctdb_db_context *ctdb_db;
260         void (*recv_pkt)(void *, struct ctdb_req_header *);
261         void *recv_context;
262         struct ctdb_req_header *hdr;
263         uint32_t generation;
264         bool ignore_generation;
265 };
266
267 /*
268   called when we should retry the operation
269  */
270 static void lock_fetch_callback(void *p, bool locked)
271 {
272         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
273         if (!state->ignore_generation &&
274             state->generation != state->ctdb_db->generation) {
275                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
276                 talloc_free(state->hdr);
277                 return;
278         }
279         state->recv_pkt(state->recv_context, state->hdr);
280         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
281 }
282
283
284 /*
285   do a non-blocking ltdb_lock, deferring this ctdb request until we
286   have the chainlock
287
288   It does the following:
289
290    1) tries to get the chainlock. If it succeeds, then it returns 0
291
292    2) if it fails to get a chainlock immediately then it sets up a
293    non-blocking chainlock via ctdb_lock_record, and when it gets the
294    chainlock it re-submits this ctdb request to the main packet
295    receive function.
296
297    This effectively queues all ctdb requests that cannot be
298    immediately satisfied until it can get the lock. This means that
299    the main ctdb daemon will not block waiting for a chainlock held by
300    a client
301
302    There are 3 possible return values:
303
304        0:    means that it got the lock immediately.
305       -1:    means that it failed to get the lock, and won't retry
306       -2:    means that it failed to get the lock immediately, but will retry
307  */
308 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
309                            TDB_DATA key, struct ctdb_req_header *hdr,
310                            void (*recv_pkt)(void *, struct ctdb_req_header *),
311                            void *recv_context, bool ignore_generation)
312 {
313         int ret;
314         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
315         struct lock_request *lreq;
316         struct lock_fetch_state *state;
317         
318         ret = tdb_chainlock_nonblock(tdb, key);
319
320         if (ret != 0 &&
321             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
322                 /* a hard failure - don't try again */
323                 return -1;
324         }
325
326         /* when torturing, ensure we test the contended path */
327         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
328             random() % 5 == 0) {
329                 ret = -1;
330                 tdb_chainunlock(tdb, key);
331         }
332
333         /* first the non-contended path */
334         if (ret == 0) {
335                 return 0;
336         }
337
338         state = talloc(hdr, struct lock_fetch_state);
339         state->ctdb = ctdb_db->ctdb;
340         state->ctdb_db = ctdb_db;
341         state->hdr = hdr;
342         state->recv_pkt = recv_pkt;
343         state->recv_context = recv_context;
344         state->generation = ctdb_db->generation;
345         state->ignore_generation = ignore_generation;
346
347         /* now the contended path */
348         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
349         if (lreq == NULL) {
350                 return -1;
351         }
352
353         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
354            so it won't be freed yet */
355         talloc_steal(state, hdr);
356
357         /* now tell the caller than we will retry asynchronously */
358         return -2;
359 }
360
361 /*
362   a varient of ctdb_ltdb_lock_requeue that also fetches the record
363  */
364 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
365                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
366                                  struct ctdb_req_header *hdr, TDB_DATA *data,
367                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
368                                  void *recv_context, bool ignore_generation)
369 {
370         int ret;
371
372         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
373                                      recv_context, ignore_generation);
374         if (ret == 0) {
375                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
376                 if (ret != 0) {
377                         int uret;
378                         uret = ctdb_ltdb_unlock(ctdb_db, key);
379                         if (uret != 0) {
380                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
381                         }
382                 }
383         }
384         return ret;
385 }
386
387
388 /*
389   paraoid check to see if the db is empty
390  */
391 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
392 {
393         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
394         int count = tdb_traverse_read(tdb, NULL, NULL);
395         if (count != 0) {
396                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
397                          ctdb_db->db_path));
398                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
399         }
400 }
401
402 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
403                                 struct ctdb_db_context *ctdb_db)
404 {
405         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
406         char *old;
407         char *reason = NULL;
408         TDB_DATA key;
409         TDB_DATA val;
410
411         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
412         key.dsize = strlen(ctdb_db->db_name);
413
414         old = ctdb_db->unhealthy_reason;
415         ctdb_db->unhealthy_reason = NULL;
416
417         val = tdb_fetch(tdb, key);
418         if (val.dsize > 0) {
419                 reason = talloc_strndup(ctdb_db,
420                                         (const char *)val.dptr,
421                                         val.dsize);
422                 if (reason == NULL) {
423                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
424                                            (int)val.dsize));
425                         ctdb_db->unhealthy_reason = old;
426                         free(val.dptr);
427                         return -1;
428                 }
429         }
430
431         if (val.dptr) {
432                 free(val.dptr);
433         }
434
435         talloc_free(old);
436         ctdb_db->unhealthy_reason = reason;
437         return 0;
438 }
439
440 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
441                                   struct ctdb_db_context *ctdb_db,
442                                   const char *given_reason,/* NULL means healthy */
443                                   int num_healthy_nodes)
444 {
445         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
446         int ret;
447         TDB_DATA key;
448         TDB_DATA val;
449         char *new_reason = NULL;
450         char *old_reason = NULL;
451
452         ret = tdb_transaction_start(tdb);
453         if (ret != 0) {
454                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
455                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
456                 return -1;
457         }
458
459         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
460         if (ret != 0) {
461                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
462                                    ctdb_db->db_name, ret));
463                 return -1;
464         }
465         old_reason = ctdb_db->unhealthy_reason;
466
467         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
468         key.dsize = strlen(ctdb_db->db_name);
469
470         if (given_reason) {
471                 new_reason = talloc_strdup(ctdb_db, given_reason);
472                 if (new_reason == NULL) {
473                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
474                                           given_reason));
475                         return -1;
476                 }
477         } else if (old_reason && num_healthy_nodes == 0) {
478                 /*
479                  * If the reason indicates ok, but there where no healthy nodes
480                  * available, that it means, we have not recovered valid content
481                  * of the db. So if there's an old reason, prefix it with
482                  * "NO-HEALTHY-NODES - "
483                  */
484                 const char *prefix;
485
486 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
487                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
488                 if (ret != 0) {
489                         prefix = _TMP_PREFIX;
490                 } else {
491                         prefix = "";
492                 }
493                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
494                                          prefix, old_reason);
495                 if (new_reason == NULL) {
496                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
497                                           prefix, old_reason));
498                         return -1;
499                 }
500 #undef _TMP_PREFIX
501         }
502
503         if (new_reason) {
504                 val.dptr = discard_const_p(uint8_t, new_reason);
505                 val.dsize = strlen(new_reason);
506
507                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
508                 if (ret != 0) {
509                         tdb_transaction_cancel(tdb);
510                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
511                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
512                                            ret, tdb_errorstr(tdb)));
513                         talloc_free(new_reason);
514                         return -1;
515                 }
516                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
517                                    ctdb_db->db_name, new_reason));
518         } else if (old_reason) {
519                 ret = tdb_delete(tdb, key);
520                 if (ret != 0) {
521                         tdb_transaction_cancel(tdb);
522                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
523                                            tdb_name(tdb), ctdb_db->db_name,
524                                            ret, tdb_errorstr(tdb)));
525                         talloc_free(new_reason);
526                         return -1;
527                 }
528                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
529                                    ctdb_db->db_name));
530         }
531
532         ret = tdb_transaction_commit(tdb);
533         if (ret != TDB_SUCCESS) {
534                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
535                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
536                 talloc_free(new_reason);
537                 return -1;
538         }
539
540         talloc_free(old_reason);
541         ctdb_db->unhealthy_reason = new_reason;
542
543         return 0;
544 }
545
546 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
547                                      struct ctdb_db_context *ctdb_db)
548 {
549         time_t now = time(NULL);
550         char *new_path;
551         char *new_reason;
552         int ret;
553         struct tm *tm;
554
555         tm = gmtime(&now);
556
557         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
558         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
559                                    "%04u%02u%02u%02u%02u%02u.0Z",
560                                    ctdb_db->db_path,
561                                    tm->tm_year+1900, tm->tm_mon+1,
562                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
563                                    tm->tm_sec);
564         if (new_path == NULL) {
565                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
566                 return -1;
567         }
568
569         new_reason = talloc_asprintf(ctdb_db,
570                                      "ERROR - Backup of corrupted TDB in '%s'",
571                                      new_path);
572         if (new_reason == NULL) {
573                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
574                 return -1;
575         }
576         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
577         talloc_free(new_reason);
578         if (ret != 0) {
579                 DEBUG(DEBUG_CRIT,(__location__
580                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
581                                  ctdb_db->db_path));
582                 return -1;
583         }
584
585         ret = rename(ctdb_db->db_path, new_path);
586         if (ret != 0) {
587                 DEBUG(DEBUG_CRIT,(__location__
588                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
589                                   ctdb_db->db_path, new_path,
590                                   errno, strerror(errno)));
591                 talloc_free(new_path);
592                 return -1;
593         }
594
595         DEBUG(DEBUG_CRIT,(__location__
596                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
597                          ctdb_db->db_path, new_path));
598         talloc_free(new_path);
599         return 0;
600 }
601
602 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
603 {
604         struct ctdb_db_context *ctdb_db;
605         int ret;
606         int ok = 0;
607         int fail = 0;
608
609         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
610                 if (!ctdb_db_persistent(ctdb_db)) {
611                         continue;
612                 }
613
614                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
615                 if (ret != 0) {
616                         DEBUG(DEBUG_ALERT,(__location__
617                                            " load persistent health for '%s' failed\n",
618                                            ctdb_db->db_path));
619                         return -1;
620                 }
621
622                 if (ctdb_db->unhealthy_reason == NULL) {
623                         ok++;
624                         DEBUG(DEBUG_INFO,(__location__
625                                    " persistent db '%s' healthy\n",
626                                    ctdb_db->db_path));
627                         continue;
628                 }
629
630                 fail++;
631                 DEBUG(DEBUG_ALERT,(__location__
632                                    " persistent db '%s' unhealthy: %s\n",
633                                    ctdb_db->db_path,
634                                    ctdb_db->unhealthy_reason));
635         }
636         DEBUG(DEBUG_NOTICE,
637               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
638                ok, fail));
639
640         if (fail != 0) {
641                 return -1;
642         }
643
644         return 0;
645 }
646
647
648 /*
649   mark a database - as healthy
650  */
651 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
652 {
653         uint32_t db_id = *(uint32_t *)indata.dptr;
654         struct ctdb_db_context *ctdb_db;
655         int ret;
656         bool may_recover = false;
657
658         ctdb_db = find_ctdb_db(ctdb, db_id);
659         if (!ctdb_db) {
660                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
661                 return -1;
662         }
663
664         if (ctdb_db->unhealthy_reason) {
665                 may_recover = true;
666         }
667
668         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
669         if (ret != 0) {
670                 DEBUG(DEBUG_ERR,(__location__
671                                  " ctdb_update_persistent_health(%s) failed\n",
672                                  ctdb_db->db_name));
673                 return -1;
674         }
675
676         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
677                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
678                                   ctdb_db->db_name));
679                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
680         }
681
682         return 0;
683 }
684
685 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
686                                    TDB_DATA indata,
687                                    TDB_DATA *outdata)
688 {
689         uint32_t db_id = *(uint32_t *)indata.dptr;
690         struct ctdb_db_context *ctdb_db;
691         int ret;
692
693         ctdb_db = find_ctdb_db(ctdb, db_id);
694         if (!ctdb_db) {
695                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
696                 return -1;
697         }
698
699         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
700         if (ret != 0) {
701                 DEBUG(DEBUG_ERR,(__location__
702                                  " ctdb_load_persistent_health(%s) failed\n",
703                                  ctdb_db->db_name));
704                 return -1;
705         }
706
707         *outdata = tdb_null;
708         if (ctdb_db->unhealthy_reason) {
709                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
710                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
711         }
712
713         return 0;
714 }
715
716
717 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
718 {
719         char *ropath;
720
721         if (ctdb_db_readonly(ctdb_db)) {
722                 return 0;
723         }
724
725         if (! ctdb_db_volatile(ctdb_db)) {
726                 DEBUG(DEBUG_ERR,
727                       ("Non-volatile databases do not support readonly flag\n"));
728                 return -1;
729         }
730
731         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
732         if (ropath == NULL) {
733                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
734                 return -1;
735         }
736         ctdb_db->rottdb = tdb_open(ropath, 
737                               ctdb->tunable.database_hash_size, 
738                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
739                               O_CREAT|O_RDWR, 0600);
740         if (ctdb_db->rottdb == NULL) {
741                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
742                 talloc_free(ropath);
743                 return -1;
744         }
745
746         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
747
748         ctdb_db_set_readonly(ctdb_db);
749
750         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
751
752         talloc_free(ropath);
753         return 0;
754 }
755
756 /*
757   attach to a database, handling both persistent and non-persistent databases
758   return 0 on success, -1 on failure
759  */
760 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
761                              bool persistent, const char *unhealthy_reason)
762 {
763         struct ctdb_db_context *ctdb_db, *tmp_db;
764         int ret;
765         struct TDB_DATA key;
766         int tdb_flags;
767         int mode = 0600;
768         int remaining_tries = 0;
769         uint8_t db_flags = 0;
770
771         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
772         CTDB_NO_MEMORY(ctdb, ctdb_db);
773
774         ctdb_db->ctdb = ctdb;
775         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
776         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
777
778         key.dsize = strlen(db_name)+1;
779         key.dptr  = discard_const(db_name);
780         ctdb_db->db_id = ctdb_hash(&key);
781         if (persistent) {
782                 ctdb_db->db_flags = CTDB_DB_FLAGS_PERSISTENT;
783         }
784
785         if (ctdb_db_volatile(ctdb_db)) {
786                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
787                 if (ctdb_db->delete_queue == NULL) {
788                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
789                 }
790
791                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
792         }
793
794         /* check for hash collisions */
795         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
796                 if (tmp_db->db_id == ctdb_db->db_id) {
797                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
798                                  tmp_db->db_id, db_name, tmp_db->db_name));
799                         talloc_free(ctdb_db);
800                         return -1;
801                 }
802         }
803
804         if (persistent) {
805                 if (unhealthy_reason) {
806                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
807                                                             unhealthy_reason, 0);
808                         if (ret != 0) {
809                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
810                                                    ctdb_db->db_name, unhealthy_reason, ret));
811                                 talloc_free(ctdb_db);
812                                 return -1;
813                         }
814                 }
815
816                 if (ctdb->max_persistent_check_errors > 0) {
817                         remaining_tries = 1;
818                 }
819                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
820                         remaining_tries = 0;
821                 }
822
823                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
824                 if (ret != 0) {
825                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
826                                    ctdb_db->db_name, ret));
827                         talloc_free(ctdb_db);
828                         return -1;
829                 }
830         }
831
832         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
833                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
834                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
835                 talloc_free(ctdb_db);
836                 return -1;
837         }
838
839         if (ctdb_db->unhealthy_reason) {
840                 /* this is just a warning, but we want that in the log file! */
841                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
842                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
843         }
844
845         /* open the database */
846         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
847                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
848                                            db_name, ctdb->pnn);
849
850         if (persistent) {
851                 db_flags = CTDB_DB_FLAGS_PERSISTENT;
852         }
853
854         tdb_flags = ctdb_db_tdb_flags(db_flags, ctdb->valgrinding,
855                                       ctdb->tunable.mutex_enabled);
856
857 again:
858         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
859                                       ctdb->tunable.database_hash_size, 
860                                       tdb_flags, 
861                                       O_CREAT|O_RDWR, mode);
862         if (ctdb_db->ltdb == NULL) {
863                 struct stat st;
864                 int saved_errno = errno;
865
866                 if (!persistent) {
867                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
868                                           ctdb_db->db_path,
869                                           saved_errno,
870                                           strerror(saved_errno)));
871                         talloc_free(ctdb_db);
872                         return -1;
873                 }
874
875                 if (remaining_tries == 0) {
876                         DEBUG(DEBUG_CRIT,(__location__
877                                           "Failed to open persistent tdb '%s': %d - %s\n",
878                                           ctdb_db->db_path,
879                                           saved_errno,
880                                           strerror(saved_errno)));
881                         talloc_free(ctdb_db);
882                         return -1;
883                 }
884
885                 ret = stat(ctdb_db->db_path, &st);
886                 if (ret != 0) {
887                         DEBUG(DEBUG_CRIT,(__location__
888                                           "Failed to open persistent tdb '%s': %d - %s\n",
889                                           ctdb_db->db_path,
890                                           saved_errno,
891                                           strerror(saved_errno)));
892                         talloc_free(ctdb_db);
893                         return -1;
894                 }
895
896                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
897                 if (ret != 0) {
898                         DEBUG(DEBUG_CRIT,(__location__
899                                           "Failed to open persistent tdb '%s': %d - %s\n",
900                                           ctdb_db->db_path,
901                                           saved_errno,
902                                           strerror(saved_errno)));
903                         talloc_free(ctdb_db);
904                         return -1;
905                 }
906
907                 remaining_tries--;
908                 mode = st.st_mode;
909                 goto again;
910         }
911
912         if (!persistent) {
913                 ctdb_check_db_empty(ctdb_db);
914         } else {
915                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
916                 if (ret != 0) {
917                         int fd;
918                         struct stat st;
919
920                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
921                                           ctdb_db->db_path, ret,
922                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
923                         if (remaining_tries == 0) {
924                                 talloc_free(ctdb_db);
925                                 return -1;
926                         }
927
928                         fd = tdb_fd(ctdb_db->ltdb->tdb);
929                         ret = fstat(fd, &st);
930                         if (ret != 0) {
931                                 DEBUG(DEBUG_CRIT,(__location__
932                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
933                                                   ctdb_db->db_path,
934                                                   errno,
935                                                   strerror(errno)));
936                                 talloc_free(ctdb_db);
937                                 return -1;
938                         }
939
940                         /* close the TDB */
941                         talloc_free(ctdb_db->ltdb);
942                         ctdb_db->ltdb = NULL;
943
944                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
945                         if (ret != 0) {
946                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
947                                                   ctdb_db->db_path));
948                                 talloc_free(ctdb_db);
949                                 return -1;
950                         }
951
952                         remaining_tries--;
953                         mode = st.st_mode;
954                         goto again;
955                 }
956         }
957
958         /* remember the flags the client has specified */
959         tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
960
961
962         /* set up a rb tree we can use to track which records we have a 
963            fetch-lock in-flight for so we can defer any additional calls
964            for the same record.
965          */
966         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
967         if (ctdb_db->deferred_fetch == NULL) {
968                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
969                 talloc_free(ctdb_db);
970                 return -1;
971         }
972
973         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
974         if (ctdb_db->defer_dmaster == NULL) {
975                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
976                                   ctdb_db->db_name));
977                 talloc_free(ctdb_db);
978                 return -1;
979         }
980
981         DLIST_ADD(ctdb->db_list, ctdb_db);
982
983         /* setting this can help some high churn databases */
984         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
985
986         /* 
987            all databases support the "null" function. we need this in
988            order to do forced migration of records
989         */
990         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
991         if (ret != 0) {
992                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
993                 talloc_free(ctdb_db);
994                 return -1;
995         }
996
997         /* 
998            all databases support the "fetch" function. we need this
999            for efficient Samba3 ctdb fetch
1000         */
1001         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1002         if (ret != 0) {
1003                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1004                 talloc_free(ctdb_db);
1005                 return -1;
1006         }
1007
1008         /* 
1009            all databases support the "fetch_with_header" function. we need this
1010            for efficient readonly record fetches
1011         */
1012         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1013         if (ret != 0) {
1014                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1015                 talloc_free(ctdb_db);
1016                 return -1;
1017         }
1018
1019         ret = ctdb_vacuum_init(ctdb_db);
1020         if (ret != 0) {
1021                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1022                                   "database '%s'\n", ctdb_db->db_name));
1023                 talloc_free(ctdb_db);
1024                 return -1;
1025         }
1026
1027         ret = ctdb_migration_init(ctdb_db);
1028         if (ret != 0) {
1029                 DEBUG(DEBUG_ERR,
1030                       ("Failed to setup migration tracking for db '%s'\n",
1031                        ctdb_db->db_name));
1032                 talloc_free(ctdb_db);
1033                 return -1;
1034         }
1035
1036         ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1037                            &ctdb_db->lock_log);
1038         if (ret != 0) {
1039                 DEBUG(DEBUG_ERR,
1040                       ("Failed to setup lock logging for db '%s'\n",
1041                        ctdb_db->db_name));
1042                 talloc_free(ctdb_db);
1043                 return -1;
1044         }
1045
1046         ctdb_db->generation = ctdb->vnn_map->generation;
1047
1048         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1049                             ctdb_db->db_path, tdb_flags));
1050
1051         /* success */
1052         return 0;
1053 }
1054
1055
1056 struct ctdb_deferred_attach_context {
1057         struct ctdb_deferred_attach_context *next, *prev;
1058         struct ctdb_context *ctdb;
1059         struct ctdb_req_control_old *c;
1060 };
1061
1062
1063 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1064 {
1065         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1066
1067         return 0;
1068 }
1069
1070 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1071                                          struct tevent_timer *te,
1072                                          struct timeval t, void *private_data)
1073 {
1074         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1075         struct ctdb_context *ctdb = da_ctx->ctdb;
1076
1077         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1078         talloc_free(da_ctx);
1079 }
1080
1081 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1082                                           struct tevent_timer *te,
1083                                           struct timeval t, void *private_data)
1084 {
1085         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1086         struct ctdb_context *ctdb = da_ctx->ctdb;
1087
1088         /* This talloc-steals the packet ->c */
1089         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1090         talloc_free(da_ctx);
1091 }
1092
1093 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1094 {
1095         struct ctdb_deferred_attach_context *da_ctx;
1096
1097         /* call it from the main event loop as soon as the current event 
1098            finishes.
1099          */
1100         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1101                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1102                 tevent_add_timer(ctdb->ev, da_ctx,
1103                                  timeval_current_ofs(1,0),
1104                                  ctdb_deferred_attach_callback, da_ctx);
1105         }
1106
1107         return 0;
1108 }
1109
1110 /*
1111   a client has asked to attach a new database
1112  */
1113 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1114                                TDB_DATA *outdata,
1115                                bool persistent, uint32_t client_id,
1116                                struct ctdb_req_control_old *c,
1117                                bool *async_reply)
1118 {
1119         const char *db_name = (const char *)indata.dptr;
1120         struct ctdb_db_context *db;
1121         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1122         struct ctdb_client *client = NULL;
1123
1124         if (ctdb->tunable.allow_client_db_attach == 0) {
1125                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1126                                   "AllowClientDBAccess == 0\n", db_name));
1127                 return -1;
1128         }
1129
1130         /* don't allow any local clients to attach while we are in recovery mode
1131          * except for the recovery daemon.
1132          * allow all attach from the network since these are always from remote
1133          * recovery daemons.
1134          */
1135         if (client_id != 0) {
1136                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1137         }
1138         if (client != NULL) {
1139                 /* If the node is inactive it is not part of the cluster
1140                    and we should not allow clients to attach to any
1141                    databases
1142                 */
1143                 if (node->flags & NODE_FLAGS_INACTIVE) {
1144                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1145                         return -1;
1146                 }
1147
1148                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1149                     client->pid != ctdb->recoverd_pid &&
1150                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1151                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1152
1153                         if (da_ctx == NULL) {
1154                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1155                                 return -1;
1156                         }
1157
1158                         da_ctx->ctdb = ctdb;
1159                         da_ctx->c = talloc_steal(da_ctx, c);
1160                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1161                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1162
1163                         tevent_add_timer(ctdb->ev, da_ctx,
1164                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1165                                          ctdb_deferred_attach_timeout, da_ctx);
1166
1167                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1168                         *async_reply = true;
1169                         return 0;
1170                 }
1171         }
1172
1173         /* see if we already have this name */
1174         db = ctdb_db_handle(ctdb, db_name);
1175         if (db) {
1176                 if (ctdb_db_persistent(db) != persistent) {
1177                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1178                                           "database %s\n", persistent ? "" : "non-",
1179                                           ctdb_db_persistent(db) ? "" : "non-", db_name));
1180                         return -1;
1181                 }
1182                 outdata->dptr  = (uint8_t *)&db->db_id;
1183                 outdata->dsize = sizeof(db->db_id);
1184                 return 0;
1185         }
1186
1187         if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
1188                 return -1;
1189         }
1190
1191         db = ctdb_db_handle(ctdb, db_name);
1192         if (!db) {
1193                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1194                 return -1;
1195         }
1196
1197         outdata->dptr  = (uint8_t *)&db->db_id;
1198         outdata->dsize = sizeof(db->db_id);
1199
1200         /* Try to ensure it's locked in mem */
1201         lockdown_memory(ctdb->valgrinding);
1202
1203         /* tell all the other nodes about this database */
1204         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1205                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1206                                                 CTDB_CONTROL_DB_ATTACH,
1207                                  0, CTDB_CTRL_FLAG_NOREPLY,
1208                                  indata, NULL, NULL);
1209
1210         /* success */
1211         return 0;
1212 }
1213
1214 /*
1215  * a client has asked to detach from a database
1216  */
1217 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1218                                uint32_t client_id)
1219 {
1220         uint32_t db_id;
1221         struct ctdb_db_context *ctdb_db;
1222         struct ctdb_client *client = NULL;
1223
1224         db_id = *(uint32_t *)indata.dptr;
1225         ctdb_db = find_ctdb_db(ctdb, db_id);
1226         if (ctdb_db == NULL) {
1227                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1228                                   db_id));
1229                 return -1;
1230         }
1231
1232         if (ctdb->tunable.allow_client_db_attach == 1) {
1233                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1234                                   "Clients are allowed access to databases "
1235                                   "(AllowClientDBAccess == 1)\n",
1236                                   ctdb_db->db_name));
1237                 return -1;
1238         }
1239
1240         if (! ctdb_db_volatile(ctdb_db)) {
1241                 DEBUG(DEBUG_ERR,
1242                       ("Detaching non-volatile database %s denied\n",
1243                        ctdb_db->db_name));
1244                 return -1;
1245         }
1246
1247         /* Cannot detach from database when in recovery */
1248         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1249                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1250                 return -1;
1251         }
1252
1253         /* If a control comes from a client, then broadcast it to all nodes.
1254          * Do the actual detach only if the control comes from other daemons.
1255          */
1256         if (client_id != 0) {
1257                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1258                 if (client != NULL) {
1259                         /* forward the control to all the nodes */
1260                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1261                                                  CTDB_CONTROL_DB_DETACH, 0,
1262                                                  CTDB_CTRL_FLAG_NOREPLY,
1263                                                  indata, NULL, NULL);
1264                         return 0;
1265                 }
1266                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1267                                   "for database '%s'\n", ctdb_db->db_name));
1268                 return -1;
1269         }
1270
1271         /* Detach database from recoverd */
1272         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1273                                      CTDB_SRVID_DETACH_DATABASE,
1274                                      indata) != 0) {
1275                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1276                 return -1;
1277         }
1278
1279         /* Disable vacuuming and drop all vacuuming data */
1280         talloc_free(ctdb_db->vacuum_handle);
1281         talloc_free(ctdb_db->delete_queue);
1282
1283         /* Terminate any deferred fetch */
1284         talloc_free(ctdb_db->deferred_fetch);
1285
1286         /* Terminate any traverses */
1287         while (ctdb_db->traverse) {
1288                 talloc_free(ctdb_db->traverse);
1289         }
1290
1291         /* Terminate any revokes */
1292         while (ctdb_db->revokechild_active) {
1293                 talloc_free(ctdb_db->revokechild_active);
1294         }
1295
1296         /* Free readonly tracking database */
1297         if (ctdb_db_readonly(ctdb_db)) {
1298                 talloc_free(ctdb_db->rottdb);
1299         }
1300
1301         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1302
1303         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1304                              ctdb_db->db_name));
1305         talloc_free(ctdb_db);
1306
1307         return 0;
1308 }
1309
1310 /*
1311   attach to all existing persistent databases
1312  */
1313 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1314                                   const char *unhealthy_reason)
1315 {
1316         DIR *d;
1317         struct dirent *de;
1318
1319         /* open the persistent db directory and scan it for files */
1320         d = opendir(ctdb->db_directory_persistent);
1321         if (d == NULL) {
1322                 return 0;
1323         }
1324
1325         while ((de=readdir(d))) {
1326                 char *p, *s, *q;
1327                 size_t len = strlen(de->d_name);
1328                 uint32_t node;
1329                 int invalid_name = 0;
1330                 
1331                 s = talloc_strdup(ctdb, de->d_name);
1332                 if (s == NULL) {
1333                         closedir(d);
1334                         CTDB_NO_MEMORY(ctdb, s);
1335                 }
1336
1337                 /* only accept names ending in .tdb */
1338                 p = strstr(s, ".tdb.");
1339                 if (len < 7 || p == NULL) {
1340                         talloc_free(s);
1341                         continue;
1342                 }
1343
1344                 /* only accept names ending with .tdb. and any number of digits */
1345                 q = p+5;
1346                 while (*q != 0 && invalid_name == 0) {
1347                         if (!isdigit(*q++)) {
1348                                 invalid_name = 1;
1349                         }
1350                 }
1351                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1352                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1353                         talloc_free(s);
1354                         continue;
1355                 }
1356                 p[4] = 0;
1357
1358                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
1359                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1360                         closedir(d);
1361                         talloc_free(s);
1362                         return -1;
1363                 }
1364
1365                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1366
1367                 talloc_free(s);
1368         }
1369         closedir(d);
1370         return 0;
1371 }
1372
1373 int ctdb_attach_databases(struct ctdb_context *ctdb)
1374 {
1375         int ret;
1376         char *persistent_health_path = NULL;
1377         char *unhealthy_reason = NULL;
1378         bool first_try = true;
1379
1380         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1381                                                  ctdb->db_directory_state,
1382                                                  PERSISTENT_HEALTH_TDB,
1383                                                  ctdb->pnn);
1384         if (persistent_health_path == NULL) {
1385                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1386                 return -1;
1387         }
1388
1389 again:
1390
1391         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1392                                                    0, TDB_DISALLOW_NESTING,
1393                                                    O_CREAT | O_RDWR, 0600);
1394         if (ctdb->db_persistent_health == NULL) {
1395                 struct tdb_wrap *tdb;
1396
1397                 if (!first_try) {
1398                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1399                                           persistent_health_path,
1400                                           errno,
1401                                           strerror(errno)));
1402                         talloc_free(persistent_health_path);
1403                         talloc_free(unhealthy_reason);
1404                         return -1;
1405                 }
1406                 first_try = false;
1407
1408                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1409                                                    persistent_health_path,
1410                                                    "was cleared after a failure",
1411                                                    "manual verification needed");
1412                 if (unhealthy_reason == NULL) {
1413                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1414                         talloc_free(persistent_health_path);
1415                         return -1;
1416                 }
1417
1418                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1419                                   persistent_health_path));
1420                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1421                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1422                                     O_CREAT | O_RDWR, 0600);
1423                 if (tdb) {
1424                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1425                                           persistent_health_path,
1426                                           errno,
1427                                           strerror(errno)));
1428                         talloc_free(persistent_health_path);
1429                         talloc_free(unhealthy_reason);
1430                         return -1;
1431                 }
1432
1433                 talloc_free(tdb);
1434                 goto again;
1435         }
1436         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1437         if (ret != 0) {
1438                 struct tdb_wrap *tdb;
1439
1440                 talloc_free(ctdb->db_persistent_health);
1441                 ctdb->db_persistent_health = NULL;
1442
1443                 if (!first_try) {
1444                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1445                                           persistent_health_path));
1446                         talloc_free(persistent_health_path);
1447                         talloc_free(unhealthy_reason);
1448                         return -1;
1449                 }
1450                 first_try = false;
1451
1452                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1453                                                    persistent_health_path,
1454                                                    "was cleared after a failure",
1455                                                    "manual verification needed");
1456                 if (unhealthy_reason == NULL) {
1457                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1458                         talloc_free(persistent_health_path);
1459                         return -1;
1460                 }
1461
1462                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1463                                   persistent_health_path));
1464                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1465                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1466                                     O_CREAT | O_RDWR, 0600);
1467                 if (tdb) {
1468                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1469                                           persistent_health_path,
1470                                           errno,
1471                                           strerror(errno)));
1472                         talloc_free(persistent_health_path);
1473                         talloc_free(unhealthy_reason);
1474                         return -1;
1475                 }
1476
1477                 talloc_free(tdb);
1478                 goto again;
1479         }
1480         talloc_free(persistent_health_path);
1481
1482         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1483         talloc_free(unhealthy_reason);
1484         if (ret != 0) {
1485                 return ret;
1486         }
1487
1488         return 0;
1489 }
1490
1491 /*
1492   called when a broadcast seqnum update comes in
1493  */
1494 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1495 {
1496         struct ctdb_db_context *ctdb_db;
1497         if (srcnode == ctdb->pnn) {
1498                 /* don't update ourselves! */
1499                 return 0;
1500         }
1501
1502         ctdb_db = find_ctdb_db(ctdb, db_id);
1503         if (!ctdb_db) {
1504                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1505                 return -1;
1506         }
1507
1508         if (ctdb_db->unhealthy_reason) {
1509                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1510                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1511                 return -1;
1512         }
1513
1514         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1515         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1516         return 0;
1517 }
1518
1519 /*
1520   timer to check for seqnum changes in a ltdb and propogate them
1521  */
1522 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1523                                    struct tevent_timer *te,
1524                                    struct timeval t, void *p)
1525 {
1526         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1527         struct ctdb_context *ctdb = ctdb_db->ctdb;
1528         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1529         if (new_seqnum != ctdb_db->seqnum) {
1530                 /* something has changed - propogate it */
1531                 TDB_DATA data;
1532                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1533                 data.dsize = sizeof(uint32_t);
1534                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1535                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1536                                          data, NULL, NULL);             
1537         }
1538         ctdb_db->seqnum = new_seqnum;
1539
1540         /* setup a new timer */
1541         ctdb_db->seqnum_update =
1542                 tevent_add_timer(ctdb->ev, ctdb_db,
1543                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1544                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1545                                  ctdb_ltdb_seqnum_check, ctdb_db);
1546 }
1547
1548 /*
1549   enable seqnum handling on this db
1550  */
1551 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1552 {
1553         struct ctdb_db_context *ctdb_db;
1554         ctdb_db = find_ctdb_db(ctdb, db_id);
1555         if (!ctdb_db) {
1556                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1557                 return -1;
1558         }
1559
1560         if (ctdb_db->seqnum_update == NULL) {
1561                 ctdb_db->seqnum_update = tevent_add_timer(
1562                         ctdb->ev, ctdb_db,
1563                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1564                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1565                         ctdb_ltdb_seqnum_check, ctdb_db);
1566         }
1567
1568         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1569         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1570         return 0;
1571 }
1572
1573 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1574 {
1575         if (ctdb_db_sticky(ctdb_db)) {
1576                 return 0;
1577         }
1578
1579         if (! ctdb_db_volatile(ctdb_db)) {
1580                 DEBUG(DEBUG_ERR,
1581                       ("Non-volatile databases do not support sticky flag\n"));
1582                 return -1;
1583         }
1584
1585         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1586
1587         ctdb_db_set_sticky(ctdb_db);
1588
1589         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1590
1591         return 0;
1592 }
1593
1594 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1595 {
1596         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1597         int i;
1598
1599         for (i=0; i<MAX_HOT_KEYS; i++) {
1600                 if (s->hot_keys[i].key.dsize > 0) {
1601                         talloc_free(s->hot_keys[i].key.dptr);
1602                 }
1603         }
1604
1605         ZERO_STRUCT(ctdb_db->statistics);
1606 }
1607
1608 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1609                                 uint32_t db_id,
1610                                 TDB_DATA *outdata)
1611 {
1612         struct ctdb_db_context *ctdb_db;
1613         struct ctdb_db_statistics_old *stats;
1614         int i;
1615         int len;
1616         char *ptr;
1617
1618         ctdb_db = find_ctdb_db(ctdb, db_id);
1619         if (!ctdb_db) {
1620                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1621                 return -1;
1622         }
1623
1624         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1625         for (i = 0; i < MAX_HOT_KEYS; i++) {
1626                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1627         }
1628
1629         stats = talloc_size(outdata, len);
1630         if (stats == NULL) {
1631                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1632                 return -1;
1633         }
1634
1635         memcpy(stats, &ctdb_db->statistics,
1636                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1637
1638         stats->num_hot_keys = MAX_HOT_KEYS;
1639
1640         ptr = &stats->hot_keys_wire[0];
1641         for (i = 0; i < MAX_HOT_KEYS; i++) {
1642                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1643                        ctdb_db->statistics.hot_keys[i].key.dsize);
1644                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1645         }
1646
1647         outdata->dptr  = (uint8_t *)stats;
1648         outdata->dsize = len;
1649
1650         return 0;
1651 }