ctdb-daemon: Pass db_flags instead of passing persistent flag
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
45
46 /**
47  * write a record to a normal database
48  *
49  * This is the server-variant of the ctdb_ltdb_store function.
50  * It contains logic to determine whether a record should be
51  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52  * controls to the local ctdb daemon if apporpriate.
53  */
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55                                   TDB_DATA key,
56                                   struct ctdb_ltdb_header *header,
57                                   TDB_DATA data)
58 {
59         struct ctdb_context *ctdb = ctdb_db->ctdb;
60         TDB_DATA rec[2];
61         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
62         int ret;
63         bool seqnum_suppressed = false;
64         bool keep = false;
65         bool schedule_for_deletion = false;
66         bool remove_from_delete_queue = false;
67         uint32_t lmaster;
68
69         if (ctdb->flags & CTDB_FLAG_TORTURE) {
70                 TDB_DATA old;
71                 struct ctdb_ltdb_header *h2;
72
73                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74                 h2 = (struct ctdb_ltdb_header *)old.dptr;
75                 if (old.dptr != NULL &&
76                     old.dsize >= hsize &&
77                     h2->rsn > header->rsn) {
78                         DEBUG(DEBUG_ERR,
79                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
80                                h2->rsn, header->rsn));
81                 }
82                 if (old.dptr) {
83                         free(old.dptr);
84                 }
85         }
86
87         if (ctdb->vnn_map == NULL) {
88                 /*
89                  * Called from a client: always store the record
90                  * Also don't call ctdb_lmaster since it uses the vnn_map!
91                  */
92                 keep = true;
93                 goto store;
94         }
95
96         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
97
98         /*
99          * If we migrate an empty record off to another node
100          * and the record has not been migrated with data,
101          * delete the record instead of storing the empty record.
102          */
103         if (data.dsize != 0) {
104                 keep = true;
105         } else if (header->flags & CTDB_REC_RO_FLAGS) {
106                 keep = true;
107         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
108                 /*
109                  * The record is not created by the client but
110                  * automatically by the ctdb_ltdb_fetch logic that
111                  * creates a record with an initial header in the
112                  * ltdb before trying to migrate the record from
113                  * the current lmaster. Keep it instead of trying
114                  * to delete the non-existing record...
115                  */
116                 keep = true;
117                 schedule_for_deletion = true;
118         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
119                 keep = true;
120         } else if (ctdb_db->ctdb->pnn == lmaster) {
121                 /*
122                  * If we are lmaster, then we usually keep the record.
123                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
124                  * and the record is empty and has never been migrated
125                  * with data, then we should delete it instead of storing it.
126                  * This is part of the vacuuming process.
127                  *
128                  * The reason that we usually need to store even empty records
129                  * on the lmaster is that a client operating directly on the
130                  * lmaster (== dmaster) expects the local copy of the record to
131                  * exist after successful ctdb migrate call. If the record does
132                  * not exist, the client goes into a migrate loop and eventually
133                  * fails. So storing the empty record makes sure that we do not
134                  * need to change the client code.
135                  */
136                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
137                         keep = true;
138                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
139                         keep = true;
140                 }
141         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
142                 keep = true;
143         }
144
145         if (keep) {
146                 if (ctdb_db_volatile(ctdb_db) &&
147                     (ctdb_db->ctdb->pnn == header->dmaster) &&
148                     !(header->flags & CTDB_REC_RO_FLAGS))
149                 {
150                         header->rsn++;
151
152                         if (data.dsize == 0) {
153                                 schedule_for_deletion = true;
154                         }
155                 }
156                 remove_from_delete_queue = !schedule_for_deletion;
157         }
158
159 store:
160         /*
161          * The VACUUM_MIGRATED flag is only set temporarily for
162          * the above logic when the record was retrieved by a
163          * VACUUM_MIGRATE call and should not be stored in the
164          * database.
165          *
166          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
167          * and there are two cases in which the corresponding record
168          * is stored in the local database:
169          * 1. The record has been migrated with data in the past
170          *    (the MIGRATED_WITH_DATA record flag is set).
171          * 2. The record has been filled with data again since it
172          *    had been submitted in the VACUUM_FETCH message to the
173          *    lmaster.
174          * For such records it is important to not store the
175          * VACUUM_MIGRATED flag in the database.
176          */
177         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
178
179         /*
180          * Similarly, clear the AUTOMATIC flag which should not enter
181          * the local database copy since this would require client
182          * modifications to clear the flag when the client stores
183          * the record.
184          */
185         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
186
187         rec[0].dsize = hsize;
188         rec[0].dptr = (uint8_t *)header;
189
190         rec[1].dsize = data.dsize;
191         rec[1].dptr = data.dptr;
192
193         /* Databases with seqnum updates enabled only get their seqnum
194            changes when/if we modify the data */
195         if (ctdb_db->seqnum_update != NULL) {
196                 TDB_DATA old;
197                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
198
199                 if ((old.dsize == hsize + data.dsize) &&
200                     memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
201                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
202                         seqnum_suppressed = true;
203                 }
204                 if (old.dptr != NULL) {
205                         free(old.dptr);
206                 }
207         }
208
209         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
210                             ctdb_db->db_name,
211                             keep?"storing":"deleting",
212                             ctdb_hash(&key)));
213
214         if (keep) {
215                 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
216         } else {
217                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
218         }
219
220         if (ret != 0) {
221                 int lvl = DEBUG_ERR;
222
223                 if (keep == false &&
224                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
225                 {
226                         lvl = DEBUG_DEBUG;
227                 }
228
229                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
230                             "%d - %s\n",
231                             ctdb_db->db_name,
232                             keep?"store":"delete", ret,
233                             tdb_errorstr(ctdb_db->ltdb->tdb)));
234
235                 schedule_for_deletion = false;
236                 remove_from_delete_queue = false;
237         }
238         if (seqnum_suppressed) {
239                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
240         }
241
242         if (schedule_for_deletion) {
243                 int ret2;
244                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
245                 if (ret2 != 0) {
246                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
247                 }
248         }
249
250         if (remove_from_delete_queue) {
251                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
252         }
253
254         return ret;
255 }
256
257 struct lock_fetch_state {
258         struct ctdb_context *ctdb;
259         struct ctdb_db_context *ctdb_db;
260         void (*recv_pkt)(void *, struct ctdb_req_header *);
261         void *recv_context;
262         struct ctdb_req_header *hdr;
263         uint32_t generation;
264         bool ignore_generation;
265 };
266
267 /*
268   called when we should retry the operation
269  */
270 static void lock_fetch_callback(void *p, bool locked)
271 {
272         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
273         if (!state->ignore_generation &&
274             state->generation != state->ctdb_db->generation) {
275                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
276                 talloc_free(state->hdr);
277                 return;
278         }
279         state->recv_pkt(state->recv_context, state->hdr);
280         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
281 }
282
283
284 /*
285   do a non-blocking ltdb_lock, deferring this ctdb request until we
286   have the chainlock
287
288   It does the following:
289
290    1) tries to get the chainlock. If it succeeds, then it returns 0
291
292    2) if it fails to get a chainlock immediately then it sets up a
293    non-blocking chainlock via ctdb_lock_record, and when it gets the
294    chainlock it re-submits this ctdb request to the main packet
295    receive function.
296
297    This effectively queues all ctdb requests that cannot be
298    immediately satisfied until it can get the lock. This means that
299    the main ctdb daemon will not block waiting for a chainlock held by
300    a client
301
302    There are 3 possible return values:
303
304        0:    means that it got the lock immediately.
305       -1:    means that it failed to get the lock, and won't retry
306       -2:    means that it failed to get the lock immediately, but will retry
307  */
308 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
309                            TDB_DATA key, struct ctdb_req_header *hdr,
310                            void (*recv_pkt)(void *, struct ctdb_req_header *),
311                            void *recv_context, bool ignore_generation)
312 {
313         int ret;
314         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
315         struct lock_request *lreq;
316         struct lock_fetch_state *state;
317         
318         ret = tdb_chainlock_nonblock(tdb, key);
319
320         if (ret != 0 &&
321             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
322                 /* a hard failure - don't try again */
323                 return -1;
324         }
325
326         /* when torturing, ensure we test the contended path */
327         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
328             random() % 5 == 0) {
329                 ret = -1;
330                 tdb_chainunlock(tdb, key);
331         }
332
333         /* first the non-contended path */
334         if (ret == 0) {
335                 return 0;
336         }
337
338         state = talloc(hdr, struct lock_fetch_state);
339         state->ctdb = ctdb_db->ctdb;
340         state->ctdb_db = ctdb_db;
341         state->hdr = hdr;
342         state->recv_pkt = recv_pkt;
343         state->recv_context = recv_context;
344         state->generation = ctdb_db->generation;
345         state->ignore_generation = ignore_generation;
346
347         /* now the contended path */
348         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
349         if (lreq == NULL) {
350                 return -1;
351         }
352
353         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
354            so it won't be freed yet */
355         talloc_steal(state, hdr);
356
357         /* now tell the caller than we will retry asynchronously */
358         return -2;
359 }
360
361 /*
362   a varient of ctdb_ltdb_lock_requeue that also fetches the record
363  */
364 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
365                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
366                                  struct ctdb_req_header *hdr, TDB_DATA *data,
367                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
368                                  void *recv_context, bool ignore_generation)
369 {
370         int ret;
371
372         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
373                                      recv_context, ignore_generation);
374         if (ret == 0) {
375                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
376                 if (ret != 0) {
377                         int uret;
378                         uret = ctdb_ltdb_unlock(ctdb_db, key);
379                         if (uret != 0) {
380                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
381                         }
382                 }
383         }
384         return ret;
385 }
386
387
388 /*
389   paraoid check to see if the db is empty
390  */
391 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
392 {
393         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
394         int count = tdb_traverse_read(tdb, NULL, NULL);
395         if (count != 0) {
396                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
397                          ctdb_db->db_path));
398                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
399         }
400 }
401
402 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
403                                 struct ctdb_db_context *ctdb_db)
404 {
405         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
406         char *old;
407         char *reason = NULL;
408         TDB_DATA key;
409         TDB_DATA val;
410
411         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
412         key.dsize = strlen(ctdb_db->db_name);
413
414         old = ctdb_db->unhealthy_reason;
415         ctdb_db->unhealthy_reason = NULL;
416
417         val = tdb_fetch(tdb, key);
418         if (val.dsize > 0) {
419                 reason = talloc_strndup(ctdb_db,
420                                         (const char *)val.dptr,
421                                         val.dsize);
422                 if (reason == NULL) {
423                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
424                                            (int)val.dsize));
425                         ctdb_db->unhealthy_reason = old;
426                         free(val.dptr);
427                         return -1;
428                 }
429         }
430
431         if (val.dptr) {
432                 free(val.dptr);
433         }
434
435         talloc_free(old);
436         ctdb_db->unhealthy_reason = reason;
437         return 0;
438 }
439
440 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
441                                   struct ctdb_db_context *ctdb_db,
442                                   const char *given_reason,/* NULL means healthy */
443                                   int num_healthy_nodes)
444 {
445         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
446         int ret;
447         TDB_DATA key;
448         TDB_DATA val;
449         char *new_reason = NULL;
450         char *old_reason = NULL;
451
452         ret = tdb_transaction_start(tdb);
453         if (ret != 0) {
454                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
455                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
456                 return -1;
457         }
458
459         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
460         if (ret != 0) {
461                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
462                                    ctdb_db->db_name, ret));
463                 return -1;
464         }
465         old_reason = ctdb_db->unhealthy_reason;
466
467         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
468         key.dsize = strlen(ctdb_db->db_name);
469
470         if (given_reason) {
471                 new_reason = talloc_strdup(ctdb_db, given_reason);
472                 if (new_reason == NULL) {
473                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
474                                           given_reason));
475                         return -1;
476                 }
477         } else if (old_reason && num_healthy_nodes == 0) {
478                 /*
479                  * If the reason indicates ok, but there where no healthy nodes
480                  * available, that it means, we have not recovered valid content
481                  * of the db. So if there's an old reason, prefix it with
482                  * "NO-HEALTHY-NODES - "
483                  */
484                 const char *prefix;
485
486 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
487                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
488                 if (ret != 0) {
489                         prefix = _TMP_PREFIX;
490                 } else {
491                         prefix = "";
492                 }
493                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
494                                          prefix, old_reason);
495                 if (new_reason == NULL) {
496                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
497                                           prefix, old_reason));
498                         return -1;
499                 }
500 #undef _TMP_PREFIX
501         }
502
503         if (new_reason) {
504                 val.dptr = discard_const_p(uint8_t, new_reason);
505                 val.dsize = strlen(new_reason);
506
507                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
508                 if (ret != 0) {
509                         tdb_transaction_cancel(tdb);
510                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
511                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
512                                            ret, tdb_errorstr(tdb)));
513                         talloc_free(new_reason);
514                         return -1;
515                 }
516                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
517                                    ctdb_db->db_name, new_reason));
518         } else if (old_reason) {
519                 ret = tdb_delete(tdb, key);
520                 if (ret != 0) {
521                         tdb_transaction_cancel(tdb);
522                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
523                                            tdb_name(tdb), ctdb_db->db_name,
524                                            ret, tdb_errorstr(tdb)));
525                         talloc_free(new_reason);
526                         return -1;
527                 }
528                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
529                                    ctdb_db->db_name));
530         }
531
532         ret = tdb_transaction_commit(tdb);
533         if (ret != TDB_SUCCESS) {
534                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
535                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
536                 talloc_free(new_reason);
537                 return -1;
538         }
539
540         talloc_free(old_reason);
541         ctdb_db->unhealthy_reason = new_reason;
542
543         return 0;
544 }
545
546 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
547                                      struct ctdb_db_context *ctdb_db)
548 {
549         time_t now = time(NULL);
550         char *new_path;
551         char *new_reason;
552         int ret;
553         struct tm *tm;
554
555         tm = gmtime(&now);
556
557         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
558         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
559                                    "%04u%02u%02u%02u%02u%02u.0Z",
560                                    ctdb_db->db_path,
561                                    tm->tm_year+1900, tm->tm_mon+1,
562                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
563                                    tm->tm_sec);
564         if (new_path == NULL) {
565                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
566                 return -1;
567         }
568
569         new_reason = talloc_asprintf(ctdb_db,
570                                      "ERROR - Backup of corrupted TDB in '%s'",
571                                      new_path);
572         if (new_reason == NULL) {
573                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
574                 return -1;
575         }
576         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
577         talloc_free(new_reason);
578         if (ret != 0) {
579                 DEBUG(DEBUG_CRIT,(__location__
580                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
581                                  ctdb_db->db_path));
582                 return -1;
583         }
584
585         ret = rename(ctdb_db->db_path, new_path);
586         if (ret != 0) {
587                 DEBUG(DEBUG_CRIT,(__location__
588                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
589                                   ctdb_db->db_path, new_path,
590                                   errno, strerror(errno)));
591                 talloc_free(new_path);
592                 return -1;
593         }
594
595         DEBUG(DEBUG_CRIT,(__location__
596                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
597                          ctdb_db->db_path, new_path));
598         talloc_free(new_path);
599         return 0;
600 }
601
602 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
603 {
604         struct ctdb_db_context *ctdb_db;
605         int ret;
606         int ok = 0;
607         int fail = 0;
608
609         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
610                 if (!ctdb_db_persistent(ctdb_db)) {
611                         continue;
612                 }
613
614                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
615                 if (ret != 0) {
616                         DEBUG(DEBUG_ALERT,(__location__
617                                            " load persistent health for '%s' failed\n",
618                                            ctdb_db->db_path));
619                         return -1;
620                 }
621
622                 if (ctdb_db->unhealthy_reason == NULL) {
623                         ok++;
624                         DEBUG(DEBUG_INFO,(__location__
625                                    " persistent db '%s' healthy\n",
626                                    ctdb_db->db_path));
627                         continue;
628                 }
629
630                 fail++;
631                 DEBUG(DEBUG_ALERT,(__location__
632                                    " persistent db '%s' unhealthy: %s\n",
633                                    ctdb_db->db_path,
634                                    ctdb_db->unhealthy_reason));
635         }
636         DEBUG(DEBUG_NOTICE,
637               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
638                ok, fail));
639
640         if (fail != 0) {
641                 return -1;
642         }
643
644         return 0;
645 }
646
647
648 /*
649   mark a database - as healthy
650  */
651 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
652 {
653         uint32_t db_id = *(uint32_t *)indata.dptr;
654         struct ctdb_db_context *ctdb_db;
655         int ret;
656         bool may_recover = false;
657
658         ctdb_db = find_ctdb_db(ctdb, db_id);
659         if (!ctdb_db) {
660                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
661                 return -1;
662         }
663
664         if (ctdb_db->unhealthy_reason) {
665                 may_recover = true;
666         }
667
668         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
669         if (ret != 0) {
670                 DEBUG(DEBUG_ERR,(__location__
671                                  " ctdb_update_persistent_health(%s) failed\n",
672                                  ctdb_db->db_name));
673                 return -1;
674         }
675
676         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
677                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
678                                   ctdb_db->db_name));
679                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
680         }
681
682         return 0;
683 }
684
685 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
686                                    TDB_DATA indata,
687                                    TDB_DATA *outdata)
688 {
689         uint32_t db_id = *(uint32_t *)indata.dptr;
690         struct ctdb_db_context *ctdb_db;
691         int ret;
692
693         ctdb_db = find_ctdb_db(ctdb, db_id);
694         if (!ctdb_db) {
695                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
696                 return -1;
697         }
698
699         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
700         if (ret != 0) {
701                 DEBUG(DEBUG_ERR,(__location__
702                                  " ctdb_load_persistent_health(%s) failed\n",
703                                  ctdb_db->db_name));
704                 return -1;
705         }
706
707         *outdata = tdb_null;
708         if (ctdb_db->unhealthy_reason) {
709                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
710                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
711         }
712
713         return 0;
714 }
715
716
717 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
718 {
719         char *ropath;
720
721         if (ctdb_db_readonly(ctdb_db)) {
722                 return 0;
723         }
724
725         if (! ctdb_db_volatile(ctdb_db)) {
726                 DEBUG(DEBUG_ERR,
727                       ("Non-volatile databases do not support readonly flag\n"));
728                 return -1;
729         }
730
731         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
732         if (ropath == NULL) {
733                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
734                 return -1;
735         }
736         ctdb_db->rottdb = tdb_open(ropath, 
737                               ctdb->tunable.database_hash_size, 
738                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
739                               O_CREAT|O_RDWR, 0600);
740         if (ctdb_db->rottdb == NULL) {
741                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
742                 talloc_free(ropath);
743                 return -1;
744         }
745
746         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
747
748         ctdb_db_set_readonly(ctdb_db);
749
750         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
751
752         talloc_free(ropath);
753         return 0;
754 }
755
756 /*
757   attach to a database, handling both persistent and non-persistent databases
758   return 0 on success, -1 on failure
759  */
760 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
761                              uint8_t db_flags, const char *unhealthy_reason)
762 {
763         struct ctdb_db_context *ctdb_db, *tmp_db;
764         int ret;
765         struct TDB_DATA key;
766         int tdb_flags;
767         int mode = 0600;
768         int remaining_tries = 0;
769
770         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
771         CTDB_NO_MEMORY(ctdb, ctdb_db);
772
773         ctdb_db->ctdb = ctdb;
774         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
775         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
776
777         key.dsize = strlen(db_name)+1;
778         key.dptr  = discard_const(db_name);
779         ctdb_db->db_id = ctdb_hash(&key);
780         ctdb_db->db_flags = db_flags;
781
782         if (ctdb_db_volatile(ctdb_db)) {
783                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
784                 if (ctdb_db->delete_queue == NULL) {
785                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
786                 }
787
788                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
789         }
790
791         /* check for hash collisions */
792         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
793                 if (tmp_db->db_id == ctdb_db->db_id) {
794                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
795                                  tmp_db->db_id, db_name, tmp_db->db_name));
796                         talloc_free(ctdb_db);
797                         return -1;
798                 }
799         }
800
801         if (ctdb_db_persistent(ctdb_db)) {
802                 if (unhealthy_reason) {
803                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
804                                                             unhealthy_reason, 0);
805                         if (ret != 0) {
806                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
807                                                    ctdb_db->db_name, unhealthy_reason, ret));
808                                 talloc_free(ctdb_db);
809                                 return -1;
810                         }
811                 }
812
813                 if (ctdb->max_persistent_check_errors > 0) {
814                         remaining_tries = 1;
815                 }
816                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
817                         remaining_tries = 0;
818                 }
819
820                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
821                 if (ret != 0) {
822                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
823                                    ctdb_db->db_name, ret));
824                         talloc_free(ctdb_db);
825                         return -1;
826                 }
827         }
828
829         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
830                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
831                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
832                 talloc_free(ctdb_db);
833                 return -1;
834         }
835
836         if (ctdb_db->unhealthy_reason) {
837                 /* this is just a warning, but we want that in the log file! */
838                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
839                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
840         }
841
842         /* open the database */
843         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
844                                            ctdb_db_persistent(ctdb_db) ?
845                                                 ctdb->db_directory_persistent :
846                                                 ctdb->db_directory,
847                                            db_name, ctdb->pnn);
848
849         tdb_flags = ctdb_db_tdb_flags(db_flags, ctdb->valgrinding,
850                                       ctdb->tunable.mutex_enabled);
851
852 again:
853         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
854                                       ctdb->tunable.database_hash_size, 
855                                       tdb_flags, 
856                                       O_CREAT|O_RDWR, mode);
857         if (ctdb_db->ltdb == NULL) {
858                 struct stat st;
859                 int saved_errno = errno;
860
861                 if (! ctdb_db_persistent(ctdb_db)) {
862                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
863                                           ctdb_db->db_path,
864                                           saved_errno,
865                                           strerror(saved_errno)));
866                         talloc_free(ctdb_db);
867                         return -1;
868                 }
869
870                 if (remaining_tries == 0) {
871                         DEBUG(DEBUG_CRIT,(__location__
872                                           "Failed to open persistent tdb '%s': %d - %s\n",
873                                           ctdb_db->db_path,
874                                           saved_errno,
875                                           strerror(saved_errno)));
876                         talloc_free(ctdb_db);
877                         return -1;
878                 }
879
880                 ret = stat(ctdb_db->db_path, &st);
881                 if (ret != 0) {
882                         DEBUG(DEBUG_CRIT,(__location__
883                                           "Failed to open persistent tdb '%s': %d - %s\n",
884                                           ctdb_db->db_path,
885                                           saved_errno,
886                                           strerror(saved_errno)));
887                         talloc_free(ctdb_db);
888                         return -1;
889                 }
890
891                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
892                 if (ret != 0) {
893                         DEBUG(DEBUG_CRIT,(__location__
894                                           "Failed to open persistent tdb '%s': %d - %s\n",
895                                           ctdb_db->db_path,
896                                           saved_errno,
897                                           strerror(saved_errno)));
898                         talloc_free(ctdb_db);
899                         return -1;
900                 }
901
902                 remaining_tries--;
903                 mode = st.st_mode;
904                 goto again;
905         }
906
907         if (!ctdb_db_persistent(ctdb_db)) {
908                 ctdb_check_db_empty(ctdb_db);
909         } else {
910                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
911                 if (ret != 0) {
912                         int fd;
913                         struct stat st;
914
915                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
916                                           ctdb_db->db_path, ret,
917                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
918                         if (remaining_tries == 0) {
919                                 talloc_free(ctdb_db);
920                                 return -1;
921                         }
922
923                         fd = tdb_fd(ctdb_db->ltdb->tdb);
924                         ret = fstat(fd, &st);
925                         if (ret != 0) {
926                                 DEBUG(DEBUG_CRIT,(__location__
927                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
928                                                   ctdb_db->db_path,
929                                                   errno,
930                                                   strerror(errno)));
931                                 talloc_free(ctdb_db);
932                                 return -1;
933                         }
934
935                         /* close the TDB */
936                         talloc_free(ctdb_db->ltdb);
937                         ctdb_db->ltdb = NULL;
938
939                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
940                         if (ret != 0) {
941                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
942                                                   ctdb_db->db_path));
943                                 talloc_free(ctdb_db);
944                                 return -1;
945                         }
946
947                         remaining_tries--;
948                         mode = st.st_mode;
949                         goto again;
950                 }
951         }
952
953         /* remember the flags the client has specified */
954         tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
955
956
957         /* set up a rb tree we can use to track which records we have a 
958            fetch-lock in-flight for so we can defer any additional calls
959            for the same record.
960          */
961         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
962         if (ctdb_db->deferred_fetch == NULL) {
963                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
964                 talloc_free(ctdb_db);
965                 return -1;
966         }
967
968         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
969         if (ctdb_db->defer_dmaster == NULL) {
970                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
971                                   ctdb_db->db_name));
972                 talloc_free(ctdb_db);
973                 return -1;
974         }
975
976         DLIST_ADD(ctdb->db_list, ctdb_db);
977
978         /* setting this can help some high churn databases */
979         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
980
981         /* 
982            all databases support the "null" function. we need this in
983            order to do forced migration of records
984         */
985         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
986         if (ret != 0) {
987                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
988                 talloc_free(ctdb_db);
989                 return -1;
990         }
991
992         /* 
993            all databases support the "fetch" function. we need this
994            for efficient Samba3 ctdb fetch
995         */
996         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
997         if (ret != 0) {
998                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
999                 talloc_free(ctdb_db);
1000                 return -1;
1001         }
1002
1003         /* 
1004            all databases support the "fetch_with_header" function. we need this
1005            for efficient readonly record fetches
1006         */
1007         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1008         if (ret != 0) {
1009                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1010                 talloc_free(ctdb_db);
1011                 return -1;
1012         }
1013
1014         ret = ctdb_vacuum_init(ctdb_db);
1015         if (ret != 0) {
1016                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1017                                   "database '%s'\n", ctdb_db->db_name));
1018                 talloc_free(ctdb_db);
1019                 return -1;
1020         }
1021
1022         ret = ctdb_migration_init(ctdb_db);
1023         if (ret != 0) {
1024                 DEBUG(DEBUG_ERR,
1025                       ("Failed to setup migration tracking for db '%s'\n",
1026                        ctdb_db->db_name));
1027                 talloc_free(ctdb_db);
1028                 return -1;
1029         }
1030
1031         ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1032                            &ctdb_db->lock_log);
1033         if (ret != 0) {
1034                 DEBUG(DEBUG_ERR,
1035                       ("Failed to setup lock logging for db '%s'\n",
1036                        ctdb_db->db_name));
1037                 talloc_free(ctdb_db);
1038                 return -1;
1039         }
1040
1041         ctdb_db->generation = ctdb->vnn_map->generation;
1042
1043         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1044                             ctdb_db->db_path, tdb_flags));
1045
1046         /* success */
1047         return 0;
1048 }
1049
1050
1051 struct ctdb_deferred_attach_context {
1052         struct ctdb_deferred_attach_context *next, *prev;
1053         struct ctdb_context *ctdb;
1054         struct ctdb_req_control_old *c;
1055 };
1056
1057
1058 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1059 {
1060         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1061
1062         return 0;
1063 }
1064
1065 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1066                                          struct tevent_timer *te,
1067                                          struct timeval t, void *private_data)
1068 {
1069         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1070         struct ctdb_context *ctdb = da_ctx->ctdb;
1071
1072         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1073         talloc_free(da_ctx);
1074 }
1075
1076 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1077                                           struct tevent_timer *te,
1078                                           struct timeval t, void *private_data)
1079 {
1080         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1081         struct ctdb_context *ctdb = da_ctx->ctdb;
1082
1083         /* This talloc-steals the packet ->c */
1084         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1085         talloc_free(da_ctx);
1086 }
1087
1088 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1089 {
1090         struct ctdb_deferred_attach_context *da_ctx;
1091
1092         /* call it from the main event loop as soon as the current event 
1093            finishes.
1094          */
1095         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1096                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1097                 tevent_add_timer(ctdb->ev, da_ctx,
1098                                  timeval_current_ofs(1,0),
1099                                  ctdb_deferred_attach_callback, da_ctx);
1100         }
1101
1102         return 0;
1103 }
1104
1105 /*
1106   a client has asked to attach a new database
1107  */
1108 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1109                                TDB_DATA *outdata,
1110                                uint8_t db_flags, uint32_t client_id,
1111                                struct ctdb_req_control_old *c,
1112                                bool *async_reply)
1113 {
1114         const char *db_name = (const char *)indata.dptr;
1115         struct ctdb_db_context *db;
1116         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1117         struct ctdb_client *client = NULL;
1118
1119         if (ctdb->tunable.allow_client_db_attach == 0) {
1120                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1121                                   "AllowClientDBAccess == 0\n", db_name));
1122                 return -1;
1123         }
1124
1125         /* don't allow any local clients to attach while we are in recovery mode
1126          * except for the recovery daemon.
1127          * allow all attach from the network since these are always from remote
1128          * recovery daemons.
1129          */
1130         if (client_id != 0) {
1131                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1132         }
1133         if (client != NULL) {
1134                 /* If the node is inactive it is not part of the cluster
1135                    and we should not allow clients to attach to any
1136                    databases
1137                 */
1138                 if (node->flags & NODE_FLAGS_INACTIVE) {
1139                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1140                         return -1;
1141                 }
1142
1143                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1144                     client->pid != ctdb->recoverd_pid &&
1145                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1146                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1147
1148                         if (da_ctx == NULL) {
1149                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1150                                 return -1;
1151                         }
1152
1153                         da_ctx->ctdb = ctdb;
1154                         da_ctx->c = talloc_steal(da_ctx, c);
1155                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1156                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1157
1158                         tevent_add_timer(ctdb->ev, da_ctx,
1159                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1160                                          ctdb_deferred_attach_timeout, da_ctx);
1161
1162                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1163                         *async_reply = true;
1164                         return 0;
1165                 }
1166         }
1167
1168         /* see if we already have this name */
1169         db = ctdb_db_handle(ctdb, db_name);
1170         if (db) {
1171                 if ((db->db_flags & db_flags) != db_flags) {
1172                         DEBUG(DEBUG_ERR,
1173                               ("Error: Failed to re-attach with 0x%x flags,"
1174                                " database has 0x%x flags\n", db_flags,
1175                                db->db_flags));
1176                         return -1;
1177                 }
1178                 outdata->dptr  = (uint8_t *)&db->db_id;
1179                 outdata->dsize = sizeof(db->db_id);
1180                 return 0;
1181         }
1182
1183         if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
1184                 return -1;
1185         }
1186
1187         db = ctdb_db_handle(ctdb, db_name);
1188         if (!db) {
1189                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1190                 return -1;
1191         }
1192
1193         outdata->dptr  = (uint8_t *)&db->db_id;
1194         outdata->dsize = sizeof(db->db_id);
1195
1196         /* Try to ensure it's locked in mem */
1197         lockdown_memory(ctdb->valgrinding);
1198
1199         /* tell all the other nodes about this database */
1200         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1201                                  ctdb_db_persistent(db) ?
1202                                         CTDB_CONTROL_DB_ATTACH_PERSISTENT :
1203                                         CTDB_CONTROL_DB_ATTACH,
1204                                  0, CTDB_CTRL_FLAG_NOREPLY,
1205                                  indata, NULL, NULL);
1206
1207         /* success */
1208         return 0;
1209 }
1210
1211 /*
1212  * a client has asked to detach from a database
1213  */
1214 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1215                                uint32_t client_id)
1216 {
1217         uint32_t db_id;
1218         struct ctdb_db_context *ctdb_db;
1219         struct ctdb_client *client = NULL;
1220
1221         db_id = *(uint32_t *)indata.dptr;
1222         ctdb_db = find_ctdb_db(ctdb, db_id);
1223         if (ctdb_db == NULL) {
1224                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1225                                   db_id));
1226                 return -1;
1227         }
1228
1229         if (ctdb->tunable.allow_client_db_attach == 1) {
1230                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1231                                   "Clients are allowed access to databases "
1232                                   "(AllowClientDBAccess == 1)\n",
1233                                   ctdb_db->db_name));
1234                 return -1;
1235         }
1236
1237         if (! ctdb_db_volatile(ctdb_db)) {
1238                 DEBUG(DEBUG_ERR,
1239                       ("Detaching non-volatile database %s denied\n",
1240                        ctdb_db->db_name));
1241                 return -1;
1242         }
1243
1244         /* Cannot detach from database when in recovery */
1245         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1246                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1247                 return -1;
1248         }
1249
1250         /* If a control comes from a client, then broadcast it to all nodes.
1251          * Do the actual detach only if the control comes from other daemons.
1252          */
1253         if (client_id != 0) {
1254                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1255                 if (client != NULL) {
1256                         /* forward the control to all the nodes */
1257                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1258                                                  CTDB_CONTROL_DB_DETACH, 0,
1259                                                  CTDB_CTRL_FLAG_NOREPLY,
1260                                                  indata, NULL, NULL);
1261                         return 0;
1262                 }
1263                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1264                                   "for database '%s'\n", ctdb_db->db_name));
1265                 return -1;
1266         }
1267
1268         /* Detach database from recoverd */
1269         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1270                                      CTDB_SRVID_DETACH_DATABASE,
1271                                      indata) != 0) {
1272                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1273                 return -1;
1274         }
1275
1276         /* Disable vacuuming and drop all vacuuming data */
1277         talloc_free(ctdb_db->vacuum_handle);
1278         talloc_free(ctdb_db->delete_queue);
1279
1280         /* Terminate any deferred fetch */
1281         talloc_free(ctdb_db->deferred_fetch);
1282
1283         /* Terminate any traverses */
1284         while (ctdb_db->traverse) {
1285                 talloc_free(ctdb_db->traverse);
1286         }
1287
1288         /* Terminate any revokes */
1289         while (ctdb_db->revokechild_active) {
1290                 talloc_free(ctdb_db->revokechild_active);
1291         }
1292
1293         /* Free readonly tracking database */
1294         if (ctdb_db_readonly(ctdb_db)) {
1295                 talloc_free(ctdb_db->rottdb);
1296         }
1297
1298         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1299
1300         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1301                              ctdb_db->db_name));
1302         talloc_free(ctdb_db);
1303
1304         return 0;
1305 }
1306
1307 /*
1308   attach to all existing persistent databases
1309  */
1310 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1311                                   const char *unhealthy_reason)
1312 {
1313         DIR *d;
1314         struct dirent *de;
1315
1316         /* open the persistent db directory and scan it for files */
1317         d = opendir(ctdb->db_directory_persistent);
1318         if (d == NULL) {
1319                 return 0;
1320         }
1321
1322         while ((de=readdir(d))) {
1323                 char *p, *s, *q;
1324                 size_t len = strlen(de->d_name);
1325                 uint32_t node;
1326                 int invalid_name = 0;
1327                 
1328                 s = talloc_strdup(ctdb, de->d_name);
1329                 if (s == NULL) {
1330                         closedir(d);
1331                         CTDB_NO_MEMORY(ctdb, s);
1332                 }
1333
1334                 /* only accept names ending in .tdb */
1335                 p = strstr(s, ".tdb.");
1336                 if (len < 7 || p == NULL) {
1337                         talloc_free(s);
1338                         continue;
1339                 }
1340
1341                 /* only accept names ending with .tdb. and any number of digits */
1342                 q = p+5;
1343                 while (*q != 0 && invalid_name == 0) {
1344                         if (!isdigit(*q++)) {
1345                                 invalid_name = 1;
1346                         }
1347                 }
1348                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1349                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1350                         talloc_free(s);
1351                         continue;
1352                 }
1353                 p[4] = 0;
1354
1355                 if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
1356                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1357                         closedir(d);
1358                         talloc_free(s);
1359                         return -1;
1360                 }
1361
1362                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1363
1364                 talloc_free(s);
1365         }
1366         closedir(d);
1367         return 0;
1368 }
1369
1370 int ctdb_attach_databases(struct ctdb_context *ctdb)
1371 {
1372         int ret;
1373         char *persistent_health_path = NULL;
1374         char *unhealthy_reason = NULL;
1375         bool first_try = true;
1376
1377         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1378                                                  ctdb->db_directory_state,
1379                                                  PERSISTENT_HEALTH_TDB,
1380                                                  ctdb->pnn);
1381         if (persistent_health_path == NULL) {
1382                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1383                 return -1;
1384         }
1385
1386 again:
1387
1388         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1389                                                    0, TDB_DISALLOW_NESTING,
1390                                                    O_CREAT | O_RDWR, 0600);
1391         if (ctdb->db_persistent_health == NULL) {
1392                 struct tdb_wrap *tdb;
1393
1394                 if (!first_try) {
1395                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1396                                           persistent_health_path,
1397                                           errno,
1398                                           strerror(errno)));
1399                         talloc_free(persistent_health_path);
1400                         talloc_free(unhealthy_reason);
1401                         return -1;
1402                 }
1403                 first_try = false;
1404
1405                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1406                                                    persistent_health_path,
1407                                                    "was cleared after a failure",
1408                                                    "manual verification needed");
1409                 if (unhealthy_reason == NULL) {
1410                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1411                         talloc_free(persistent_health_path);
1412                         return -1;
1413                 }
1414
1415                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1416                                   persistent_health_path));
1417                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1418                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1419                                     O_CREAT | O_RDWR, 0600);
1420                 if (tdb) {
1421                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1422                                           persistent_health_path,
1423                                           errno,
1424                                           strerror(errno)));
1425                         talloc_free(persistent_health_path);
1426                         talloc_free(unhealthy_reason);
1427                         return -1;
1428                 }
1429
1430                 talloc_free(tdb);
1431                 goto again;
1432         }
1433         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1434         if (ret != 0) {
1435                 struct tdb_wrap *tdb;
1436
1437                 talloc_free(ctdb->db_persistent_health);
1438                 ctdb->db_persistent_health = NULL;
1439
1440                 if (!first_try) {
1441                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1442                                           persistent_health_path));
1443                         talloc_free(persistent_health_path);
1444                         talloc_free(unhealthy_reason);
1445                         return -1;
1446                 }
1447                 first_try = false;
1448
1449                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1450                                                    persistent_health_path,
1451                                                    "was cleared after a failure",
1452                                                    "manual verification needed");
1453                 if (unhealthy_reason == NULL) {
1454                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1455                         talloc_free(persistent_health_path);
1456                         return -1;
1457                 }
1458
1459                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1460                                   persistent_health_path));
1461                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1462                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1463                                     O_CREAT | O_RDWR, 0600);
1464                 if (tdb) {
1465                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1466                                           persistent_health_path,
1467                                           errno,
1468                                           strerror(errno)));
1469                         talloc_free(persistent_health_path);
1470                         talloc_free(unhealthy_reason);
1471                         return -1;
1472                 }
1473
1474                 talloc_free(tdb);
1475                 goto again;
1476         }
1477         talloc_free(persistent_health_path);
1478
1479         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1480         talloc_free(unhealthy_reason);
1481         if (ret != 0) {
1482                 return ret;
1483         }
1484
1485         return 0;
1486 }
1487
1488 /*
1489   called when a broadcast seqnum update comes in
1490  */
1491 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1492 {
1493         struct ctdb_db_context *ctdb_db;
1494         if (srcnode == ctdb->pnn) {
1495                 /* don't update ourselves! */
1496                 return 0;
1497         }
1498
1499         ctdb_db = find_ctdb_db(ctdb, db_id);
1500         if (!ctdb_db) {
1501                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1502                 return -1;
1503         }
1504
1505         if (ctdb_db->unhealthy_reason) {
1506                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1507                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1508                 return -1;
1509         }
1510
1511         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1512         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1513         return 0;
1514 }
1515
1516 /*
1517   timer to check for seqnum changes in a ltdb and propogate them
1518  */
1519 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1520                                    struct tevent_timer *te,
1521                                    struct timeval t, void *p)
1522 {
1523         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1524         struct ctdb_context *ctdb = ctdb_db->ctdb;
1525         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1526         if (new_seqnum != ctdb_db->seqnum) {
1527                 /* something has changed - propogate it */
1528                 TDB_DATA data;
1529                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1530                 data.dsize = sizeof(uint32_t);
1531                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1532                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1533                                          data, NULL, NULL);             
1534         }
1535         ctdb_db->seqnum = new_seqnum;
1536
1537         /* setup a new timer */
1538         ctdb_db->seqnum_update =
1539                 tevent_add_timer(ctdb->ev, ctdb_db,
1540                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1541                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1542                                  ctdb_ltdb_seqnum_check, ctdb_db);
1543 }
1544
1545 /*
1546   enable seqnum handling on this db
1547  */
1548 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1549 {
1550         struct ctdb_db_context *ctdb_db;
1551         ctdb_db = find_ctdb_db(ctdb, db_id);
1552         if (!ctdb_db) {
1553                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1554                 return -1;
1555         }
1556
1557         if (ctdb_db->seqnum_update == NULL) {
1558                 ctdb_db->seqnum_update = tevent_add_timer(
1559                         ctdb->ev, ctdb_db,
1560                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1561                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1562                         ctdb_ltdb_seqnum_check, ctdb_db);
1563         }
1564
1565         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1566         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1567         return 0;
1568 }
1569
1570 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1571 {
1572         if (ctdb_db_sticky(ctdb_db)) {
1573                 return 0;
1574         }
1575
1576         if (! ctdb_db_volatile(ctdb_db)) {
1577                 DEBUG(DEBUG_ERR,
1578                       ("Non-volatile databases do not support sticky flag\n"));
1579                 return -1;
1580         }
1581
1582         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1583
1584         ctdb_db_set_sticky(ctdb_db);
1585
1586         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1587
1588         return 0;
1589 }
1590
1591 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1592 {
1593         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1594         int i;
1595
1596         for (i=0; i<MAX_HOT_KEYS; i++) {
1597                 if (s->hot_keys[i].key.dsize > 0) {
1598                         talloc_free(s->hot_keys[i].key.dptr);
1599                 }
1600         }
1601
1602         ZERO_STRUCT(ctdb_db->statistics);
1603 }
1604
1605 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1606                                 uint32_t db_id,
1607                                 TDB_DATA *outdata)
1608 {
1609         struct ctdb_db_context *ctdb_db;
1610         struct ctdb_db_statistics_old *stats;
1611         int i;
1612         int len;
1613         char *ptr;
1614
1615         ctdb_db = find_ctdb_db(ctdb, db_id);
1616         if (!ctdb_db) {
1617                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1618                 return -1;
1619         }
1620
1621         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1622         for (i = 0; i < MAX_HOT_KEYS; i++) {
1623                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1624         }
1625
1626         stats = talloc_size(outdata, len);
1627         if (stats == NULL) {
1628                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1629                 return -1;
1630         }
1631
1632         memcpy(stats, &ctdb_db->statistics,
1633                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1634
1635         stats->num_hot_keys = MAX_HOT_KEYS;
1636
1637         ptr = &stats->hot_keys_wire[0];
1638         for (i = 0; i < MAX_HOT_KEYS; i++) {
1639                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1640                        ctdb_db->statistics.hot_keys[i].key.dsize);
1641                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1642         }
1643
1644         outdata->dptr  = (uint8_t *)stats;
1645         outdata->dsize = len;
1646
1647         return 0;
1648 }