ctdb-daemon: Fix CID 1272855 Operands don't affect result
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
45
46 /**
47  * write a record to a normal database
48  *
49  * This is the server-variant of the ctdb_ltdb_store function.
50  * It contains logic to determine whether a record should be
51  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52  * controls to the local ctdb daemon if apporpriate.
53  */
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55                                   TDB_DATA key,
56                                   struct ctdb_ltdb_header *header,
57                                   TDB_DATA data)
58 {
59         struct ctdb_context *ctdb = ctdb_db->ctdb;
60         TDB_DATA rec;
61         int ret;
62         bool seqnum_suppressed = false;
63         bool keep = false;
64         bool schedule_for_deletion = false;
65         bool remove_from_delete_queue = false;
66         uint32_t lmaster;
67
68         if (ctdb->flags & CTDB_FLAG_TORTURE) {
69                 struct ctdb_ltdb_header *h2;
70                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
71                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
72                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
73                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
74                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
75                 }
76                 if (rec.dptr) free(rec.dptr);
77         }
78
79         if (ctdb->vnn_map == NULL) {
80                 /*
81                  * Called from a client: always store the record
82                  * Also don't call ctdb_lmaster since it uses the vnn_map!
83                  */
84                 keep = true;
85                 goto store;
86         }
87
88         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
89
90         /*
91          * If we migrate an empty record off to another node
92          * and the record has not been migrated with data,
93          * delete the record instead of storing the empty record.
94          */
95         if (data.dsize != 0) {
96                 keep = true;
97         } else if (header->flags & CTDB_REC_RO_FLAGS) {
98                 keep = true;
99         } else if (ctdb_db->persistent) {
100                 keep = true;
101         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
102                 /*
103                  * The record is not created by the client but
104                  * automatically by the ctdb_ltdb_fetch logic that
105                  * creates a record with an initial header in the
106                  * ltdb before trying to migrate the record from
107                  * the current lmaster. Keep it instead of trying
108                  * to delete the non-existing record...
109                  */
110                 keep = true;
111                 schedule_for_deletion = true;
112         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
113                 keep = true;
114         } else if (ctdb_db->ctdb->pnn == lmaster) {
115                 /*
116                  * If we are lmaster, then we usually keep the record.
117                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
118                  * and the record is empty and has never been migrated
119                  * with data, then we should delete it instead of storing it.
120                  * This is part of the vacuuming process.
121                  *
122                  * The reason that we usually need to store even empty records
123                  * on the lmaster is that a client operating directly on the
124                  * lmaster (== dmaster) expects the local copy of the record to
125                  * exist after successful ctdb migrate call. If the record does
126                  * not exist, the client goes into a migrate loop and eventually
127                  * fails. So storing the empty record makes sure that we do not
128                  * need to change the client code.
129                  */
130                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
131                         keep = true;
132                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
133                         keep = true;
134                 }
135         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
136                 keep = true;
137         }
138
139         if (keep) {
140                 if (!ctdb_db->persistent &&
141                     (ctdb_db->ctdb->pnn == header->dmaster) &&
142                     !(header->flags & CTDB_REC_RO_FLAGS))
143                 {
144                         header->rsn++;
145
146                         if (data.dsize == 0) {
147                                 schedule_for_deletion = true;
148                         }
149                 }
150                 remove_from_delete_queue = !schedule_for_deletion;
151         }
152
153 store:
154         /*
155          * The VACUUM_MIGRATED flag is only set temporarily for
156          * the above logic when the record was retrieved by a
157          * VACUUM_MIGRATE call and should not be stored in the
158          * database.
159          *
160          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
161          * and there are two cases in which the corresponding record
162          * is stored in the local database:
163          * 1. The record has been migrated with data in the past
164          *    (the MIGRATED_WITH_DATA record flag is set).
165          * 2. The record has been filled with data again since it
166          *    had been submitted in the VACUUM_FETCH message to the
167          *    lmaster.
168          * For such records it is important to not store the
169          * VACUUM_MIGRATED flag in the database.
170          */
171         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
172
173         /*
174          * Similarly, clear the AUTOMATIC flag which should not enter
175          * the local database copy since this would require client
176          * modifications to clear the flag when the client stores
177          * the record.
178          */
179         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
180
181         rec.dsize = sizeof(*header) + data.dsize;
182         rec.dptr = talloc_size(ctdb, rec.dsize);
183         CTDB_NO_MEMORY(ctdb, rec.dptr);
184
185         memcpy(rec.dptr, header, sizeof(*header));
186         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
187
188         /* Databases with seqnum updates enabled only get their seqnum
189            changes when/if we modify the data */
190         if (ctdb_db->seqnum_update != NULL) {
191                 TDB_DATA old;
192                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
193
194                 if ( (old.dsize == rec.dsize)
195                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
196                           rec.dptr+sizeof(struct ctdb_ltdb_header),
197                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
198                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
199                         seqnum_suppressed = true;
200                 }
201                 if (old.dptr) free(old.dptr);
202         }
203
204         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
205                             ctdb_db->db_name,
206                             keep?"storing":"deleting",
207                             ctdb_hash(&key)));
208
209         if (keep) {
210                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
211         } else {
212                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
213         }
214
215         if (ret != 0) {
216                 int lvl = DEBUG_ERR;
217
218                 if (keep == false &&
219                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
220                 {
221                         lvl = DEBUG_DEBUG;
222                 }
223
224                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
225                             "%d - %s\n",
226                             ctdb_db->db_name,
227                             keep?"store":"delete", ret,
228                             tdb_errorstr(ctdb_db->ltdb->tdb)));
229
230                 schedule_for_deletion = false;
231                 remove_from_delete_queue = false;
232         }
233         if (seqnum_suppressed) {
234                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
235         }
236
237         talloc_free(rec.dptr);
238
239         if (schedule_for_deletion) {
240                 int ret2;
241                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
242                 if (ret2 != 0) {
243                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
244                 }
245         }
246
247         if (remove_from_delete_queue) {
248                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
249         }
250
251         return ret;
252 }
253
254 struct lock_fetch_state {
255         struct ctdb_context *ctdb;
256         struct ctdb_db_context *ctdb_db;
257         void (*recv_pkt)(void *, struct ctdb_req_header *);
258         void *recv_context;
259         struct ctdb_req_header *hdr;
260         uint32_t generation;
261         bool ignore_generation;
262 };
263
264 /*
265   called when we should retry the operation
266  */
267 static void lock_fetch_callback(void *p, bool locked)
268 {
269         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
270         if (!state->ignore_generation &&
271             state->generation != state->ctdb_db->generation) {
272                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
273                 talloc_free(state->hdr);
274                 return;
275         }
276         state->recv_pkt(state->recv_context, state->hdr);
277         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
278 }
279
280
281 /*
282   do a non-blocking ltdb_lock, deferring this ctdb request until we
283   have the chainlock
284
285   It does the following:
286
287    1) tries to get the chainlock. If it succeeds, then it returns 0
288
289    2) if it fails to get a chainlock immediately then it sets up a
290    non-blocking chainlock via ctdb_lock_record, and when it gets the
291    chainlock it re-submits this ctdb request to the main packet
292    receive function.
293
294    This effectively queues all ctdb requests that cannot be
295    immediately satisfied until it can get the lock. This means that
296    the main ctdb daemon will not block waiting for a chainlock held by
297    a client
298
299    There are 3 possible return values:
300
301        0:    means that it got the lock immediately.
302       -1:    means that it failed to get the lock, and won't retry
303       -2:    means that it failed to get the lock immediately, but will retry
304  */
305 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
306                            TDB_DATA key, struct ctdb_req_header *hdr,
307                            void (*recv_pkt)(void *, struct ctdb_req_header *),
308                            void *recv_context, bool ignore_generation)
309 {
310         int ret;
311         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
312         struct lock_request *lreq;
313         struct lock_fetch_state *state;
314         
315         ret = tdb_chainlock_nonblock(tdb, key);
316
317         if (ret != 0 &&
318             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
319                 /* a hard failure - don't try again */
320                 return -1;
321         }
322
323         /* when torturing, ensure we test the contended path */
324         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
325             random() % 5 == 0) {
326                 ret = -1;
327                 tdb_chainunlock(tdb, key);
328         }
329
330         /* first the non-contended path */
331         if (ret == 0) {
332                 return 0;
333         }
334
335         state = talloc(hdr, struct lock_fetch_state);
336         state->ctdb = ctdb_db->ctdb;
337         state->ctdb_db = ctdb_db;
338         state->hdr = hdr;
339         state->recv_pkt = recv_pkt;
340         state->recv_context = recv_context;
341         state->generation = ctdb_db->generation;
342         state->ignore_generation = ignore_generation;
343
344         /* now the contended path */
345         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
346         if (lreq == NULL) {
347                 return -1;
348         }
349
350         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
351            so it won't be freed yet */
352         talloc_steal(state, hdr);
353
354         /* now tell the caller than we will retry asynchronously */
355         return -2;
356 }
357
358 /*
359   a varient of ctdb_ltdb_lock_requeue that also fetches the record
360  */
361 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
362                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
363                                  struct ctdb_req_header *hdr, TDB_DATA *data,
364                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
365                                  void *recv_context, bool ignore_generation)
366 {
367         int ret;
368
369         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
370                                      recv_context, ignore_generation);
371         if (ret == 0) {
372                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
373                 if (ret != 0) {
374                         int uret;
375                         uret = ctdb_ltdb_unlock(ctdb_db, key);
376                         if (uret != 0) {
377                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
378                         }
379                 }
380         }
381         return ret;
382 }
383
384
385 /*
386   paraoid check to see if the db is empty
387  */
388 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
389 {
390         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
391         int count = tdb_traverse_read(tdb, NULL, NULL);
392         if (count != 0) {
393                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
394                          ctdb_db->db_path));
395                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
396         }
397 }
398
399 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
400                                 struct ctdb_db_context *ctdb_db)
401 {
402         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
403         char *old;
404         char *reason = NULL;
405         TDB_DATA key;
406         TDB_DATA val;
407
408         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
409         key.dsize = strlen(ctdb_db->db_name);
410
411         old = ctdb_db->unhealthy_reason;
412         ctdb_db->unhealthy_reason = NULL;
413
414         val = tdb_fetch(tdb, key);
415         if (val.dsize > 0) {
416                 reason = talloc_strndup(ctdb_db,
417                                         (const char *)val.dptr,
418                                         val.dsize);
419                 if (reason == NULL) {
420                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
421                                            (int)val.dsize));
422                         ctdb_db->unhealthy_reason = old;
423                         free(val.dptr);
424                         return -1;
425                 }
426         }
427
428         if (val.dptr) {
429                 free(val.dptr);
430         }
431
432         talloc_free(old);
433         ctdb_db->unhealthy_reason = reason;
434         return 0;
435 }
436
437 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
438                                   struct ctdb_db_context *ctdb_db,
439                                   const char *given_reason,/* NULL means healthy */
440                                   int num_healthy_nodes)
441 {
442         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
443         int ret;
444         TDB_DATA key;
445         TDB_DATA val;
446         char *new_reason = NULL;
447         char *old_reason = NULL;
448
449         ret = tdb_transaction_start(tdb);
450         if (ret != 0) {
451                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
452                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
453                 return -1;
454         }
455
456         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
457         if (ret != 0) {
458                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
459                                    ctdb_db->db_name, ret));
460                 return -1;
461         }
462         old_reason = ctdb_db->unhealthy_reason;
463
464         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
465         key.dsize = strlen(ctdb_db->db_name);
466
467         if (given_reason) {
468                 new_reason = talloc_strdup(ctdb_db, given_reason);
469                 if (new_reason == NULL) {
470                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
471                                           given_reason));
472                         return -1;
473                 }
474         } else if (old_reason && num_healthy_nodes == 0) {
475                 /*
476                  * If the reason indicates ok, but there where no healthy nodes
477                  * available, that it means, we have not recovered valid content
478                  * of the db. So if there's an old reason, prefix it with
479                  * "NO-HEALTHY-NODES - "
480                  */
481                 const char *prefix;
482
483 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
484                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
485                 if (ret != 0) {
486                         prefix = _TMP_PREFIX;
487                 } else {
488                         prefix = "";
489                 }
490                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
491                                          prefix, old_reason);
492                 if (new_reason == NULL) {
493                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
494                                           prefix, old_reason));
495                         return -1;
496                 }
497 #undef _TMP_PREFIX
498         }
499
500         if (new_reason) {
501                 val.dptr = discard_const_p(uint8_t, new_reason);
502                 val.dsize = strlen(new_reason);
503
504                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
505                 if (ret != 0) {
506                         tdb_transaction_cancel(tdb);
507                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
508                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
509                                            ret, tdb_errorstr(tdb)));
510                         talloc_free(new_reason);
511                         return -1;
512                 }
513                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
514                                    ctdb_db->db_name, new_reason));
515         } else if (old_reason) {
516                 ret = tdb_delete(tdb, key);
517                 if (ret != 0) {
518                         tdb_transaction_cancel(tdb);
519                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
520                                            tdb_name(tdb), ctdb_db->db_name,
521                                            ret, tdb_errorstr(tdb)));
522                         talloc_free(new_reason);
523                         return -1;
524                 }
525                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
526                                    ctdb_db->db_name));
527         }
528
529         ret = tdb_transaction_commit(tdb);
530         if (ret != TDB_SUCCESS) {
531                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
532                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
533                 talloc_free(new_reason);
534                 return -1;
535         }
536
537         talloc_free(old_reason);
538         ctdb_db->unhealthy_reason = new_reason;
539
540         return 0;
541 }
542
543 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
544                                      struct ctdb_db_context *ctdb_db)
545 {
546         time_t now = time(NULL);
547         char *new_path;
548         char *new_reason;
549         int ret;
550         struct tm *tm;
551
552         tm = gmtime(&now);
553
554         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
555         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
556                                    "%04u%02u%02u%02u%02u%02u.0Z",
557                                    ctdb_db->db_path,
558                                    tm->tm_year+1900, tm->tm_mon+1,
559                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
560                                    tm->tm_sec);
561         if (new_path == NULL) {
562                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
563                 return -1;
564         }
565
566         new_reason = talloc_asprintf(ctdb_db,
567                                      "ERROR - Backup of corrupted TDB in '%s'",
568                                      new_path);
569         if (new_reason == NULL) {
570                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
571                 return -1;
572         }
573         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
574         talloc_free(new_reason);
575         if (ret != 0) {
576                 DEBUG(DEBUG_CRIT,(__location__
577                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
578                                  ctdb_db->db_path));
579                 return -1;
580         }
581
582         ret = rename(ctdb_db->db_path, new_path);
583         if (ret != 0) {
584                 DEBUG(DEBUG_CRIT,(__location__
585                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
586                                   ctdb_db->db_path, new_path,
587                                   errno, strerror(errno)));
588                 talloc_free(new_path);
589                 return -1;
590         }
591
592         DEBUG(DEBUG_CRIT,(__location__
593                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
594                          ctdb_db->db_path, new_path));
595         talloc_free(new_path);
596         return 0;
597 }
598
599 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
600 {
601         struct ctdb_db_context *ctdb_db;
602         int ret;
603         int ok = 0;
604         int fail = 0;
605
606         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
607                 if (!ctdb_db->persistent) {
608                         continue;
609                 }
610
611                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
612                 if (ret != 0) {
613                         DEBUG(DEBUG_ALERT,(__location__
614                                            " load persistent health for '%s' failed\n",
615                                            ctdb_db->db_path));
616                         return -1;
617                 }
618
619                 if (ctdb_db->unhealthy_reason == NULL) {
620                         ok++;
621                         DEBUG(DEBUG_INFO,(__location__
622                                    " persistent db '%s' healthy\n",
623                                    ctdb_db->db_path));
624                         continue;
625                 }
626
627                 fail++;
628                 DEBUG(DEBUG_ALERT,(__location__
629                                    " persistent db '%s' unhealthy: %s\n",
630                                    ctdb_db->db_path,
631                                    ctdb_db->unhealthy_reason));
632         }
633         DEBUG(DEBUG_NOTICE,
634               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
635                ok, fail));
636
637         if (fail != 0) {
638                 return -1;
639         }
640
641         return 0;
642 }
643
644
645 /*
646   mark a database - as healthy
647  */
648 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
649 {
650         uint32_t db_id = *(uint32_t *)indata.dptr;
651         struct ctdb_db_context *ctdb_db;
652         int ret;
653         bool may_recover = false;
654
655         ctdb_db = find_ctdb_db(ctdb, db_id);
656         if (!ctdb_db) {
657                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
658                 return -1;
659         }
660
661         if (ctdb_db->unhealthy_reason) {
662                 may_recover = true;
663         }
664
665         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
666         if (ret != 0) {
667                 DEBUG(DEBUG_ERR,(__location__
668                                  " ctdb_update_persistent_health(%s) failed\n",
669                                  ctdb_db->db_name));
670                 return -1;
671         }
672
673         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
674                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
675                                   ctdb_db->db_name));
676                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
677         }
678
679         return 0;
680 }
681
682 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
683                                    TDB_DATA indata,
684                                    TDB_DATA *outdata)
685 {
686         uint32_t db_id = *(uint32_t *)indata.dptr;
687         struct ctdb_db_context *ctdb_db;
688         int ret;
689
690         ctdb_db = find_ctdb_db(ctdb, db_id);
691         if (!ctdb_db) {
692                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
693                 return -1;
694         }
695
696         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
697         if (ret != 0) {
698                 DEBUG(DEBUG_ERR,(__location__
699                                  " ctdb_load_persistent_health(%s) failed\n",
700                                  ctdb_db->db_name));
701                 return -1;
702         }
703
704         *outdata = tdb_null;
705         if (ctdb_db->unhealthy_reason) {
706                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
707                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
708         }
709
710         return 0;
711 }
712
713
714 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
715 {
716         char *ropath;
717
718         if (ctdb_db->readonly) {
719                 return 0;
720         }
721
722         if (ctdb_db->persistent) {
723                 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
724                 return -1;
725         }
726
727         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
728         if (ropath == NULL) {
729                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
730                 return -1;
731         }
732         ctdb_db->rottdb = tdb_open(ropath, 
733                               ctdb->tunable.database_hash_size, 
734                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
735                               O_CREAT|O_RDWR, 0600);
736         if (ctdb_db->rottdb == NULL) {
737                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
738                 talloc_free(ropath);
739                 return -1;
740         }
741
742         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
743
744         ctdb_db->readonly = true;
745
746         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
747
748         talloc_free(ropath);
749         return 0;
750 }
751
752 /*
753   attach to a database, handling both persistent and non-persistent databases
754   return 0 on success, -1 on failure
755  */
756 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
757                              bool persistent, const char *unhealthy_reason,
758                              bool jenkinshash, bool mutexes)
759 {
760         struct ctdb_db_context *ctdb_db, *tmp_db;
761         int ret;
762         struct TDB_DATA key;
763         unsigned tdb_flags;
764         int mode = 0600;
765         int remaining_tries = 0;
766
767         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
768         CTDB_NO_MEMORY(ctdb, ctdb_db);
769
770         ctdb_db->ctdb = ctdb;
771         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
772         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
773
774         key.dsize = strlen(db_name)+1;
775         key.dptr  = discard_const(db_name);
776         ctdb_db->db_id = ctdb_hash(&key);
777         ctdb_db->persistent = persistent;
778
779         if (!ctdb_db->persistent) {
780                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
781                 if (ctdb_db->delete_queue == NULL) {
782                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
783                 }
784
785                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
786         }
787
788         /* check for hash collisions */
789         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
790                 if (tmp_db->db_id == ctdb_db->db_id) {
791                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
792                                  tmp_db->db_id, db_name, tmp_db->db_name));
793                         talloc_free(ctdb_db);
794                         return -1;
795                 }
796         }
797
798         if (persistent) {
799                 if (unhealthy_reason) {
800                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
801                                                             unhealthy_reason, 0);
802                         if (ret != 0) {
803                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
804                                                    ctdb_db->db_name, unhealthy_reason, ret));
805                                 talloc_free(ctdb_db);
806                                 return -1;
807                         }
808                 }
809
810                 if (ctdb->max_persistent_check_errors > 0) {
811                         remaining_tries = 1;
812                 }
813                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
814                         remaining_tries = 0;
815                 }
816
817                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
818                 if (ret != 0) {
819                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
820                                    ctdb_db->db_name, ret));
821                         talloc_free(ctdb_db);
822                         return -1;
823                 }
824         }
825
826         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
827                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
828                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
829                 talloc_free(ctdb_db);
830                 return -1;
831         }
832
833         if (ctdb_db->unhealthy_reason) {
834                 /* this is just a warning, but we want that in the log file! */
835                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
836                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
837         }
838
839         /* open the database */
840         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
841                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
842                                            db_name, ctdb->pnn);
843
844         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
845         if (ctdb->valgrinding) {
846                 tdb_flags |= TDB_NOMMAP;
847         }
848         tdb_flags |= TDB_DISALLOW_NESTING;
849         if (jenkinshash) {
850                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
851         }
852 #ifdef TDB_MUTEX_LOCKING
853         if (ctdb->tunable.mutex_enabled && mutexes &&
854             tdb_runtime_check_for_robust_mutexes()) {
855                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
856         }
857 #endif
858
859 again:
860         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
861                                       ctdb->tunable.database_hash_size, 
862                                       tdb_flags, 
863                                       O_CREAT|O_RDWR, mode);
864         if (ctdb_db->ltdb == NULL) {
865                 struct stat st;
866                 int saved_errno = errno;
867
868                 if (!persistent) {
869                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
870                                           ctdb_db->db_path,
871                                           saved_errno,
872                                           strerror(saved_errno)));
873                         talloc_free(ctdb_db);
874                         return -1;
875                 }
876
877                 if (remaining_tries == 0) {
878                         DEBUG(DEBUG_CRIT,(__location__
879                                           "Failed to open persistent tdb '%s': %d - %s\n",
880                                           ctdb_db->db_path,
881                                           saved_errno,
882                                           strerror(saved_errno)));
883                         talloc_free(ctdb_db);
884                         return -1;
885                 }
886
887                 ret = stat(ctdb_db->db_path, &st);
888                 if (ret != 0) {
889                         DEBUG(DEBUG_CRIT,(__location__
890                                           "Failed to open persistent tdb '%s': %d - %s\n",
891                                           ctdb_db->db_path,
892                                           saved_errno,
893                                           strerror(saved_errno)));
894                         talloc_free(ctdb_db);
895                         return -1;
896                 }
897
898                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
899                 if (ret != 0) {
900                         DEBUG(DEBUG_CRIT,(__location__
901                                           "Failed to open persistent tdb '%s': %d - %s\n",
902                                           ctdb_db->db_path,
903                                           saved_errno,
904                                           strerror(saved_errno)));
905                         talloc_free(ctdb_db);
906                         return -1;
907                 }
908
909                 remaining_tries--;
910                 mode = st.st_mode;
911                 goto again;
912         }
913
914         if (!persistent) {
915                 ctdb_check_db_empty(ctdb_db);
916         } else {
917                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
918                 if (ret != 0) {
919                         int fd;
920                         struct stat st;
921
922                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
923                                           ctdb_db->db_path, ret,
924                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
925                         if (remaining_tries == 0) {
926                                 talloc_free(ctdb_db);
927                                 return -1;
928                         }
929
930                         fd = tdb_fd(ctdb_db->ltdb->tdb);
931                         ret = fstat(fd, &st);
932                         if (ret != 0) {
933                                 DEBUG(DEBUG_CRIT,(__location__
934                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
935                                                   ctdb_db->db_path,
936                                                   errno,
937                                                   strerror(errno)));
938                                 talloc_free(ctdb_db);
939                                 return -1;
940                         }
941
942                         /* close the TDB */
943                         talloc_free(ctdb_db->ltdb);
944                         ctdb_db->ltdb = NULL;
945
946                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
947                         if (ret != 0) {
948                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
949                                                   ctdb_db->db_path));
950                                 talloc_free(ctdb_db);
951                                 return -1;
952                         }
953
954                         remaining_tries--;
955                         mode = st.st_mode;
956                         goto again;
957                 }
958         }
959
960         /* set up a rb tree we can use to track which records we have a 
961            fetch-lock in-flight for so we can defer any additional calls
962            for the same record.
963          */
964         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
965         if (ctdb_db->deferred_fetch == NULL) {
966                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
967                 talloc_free(ctdb_db);
968                 return -1;
969         }
970
971         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
972         if (ctdb_db->defer_dmaster == NULL) {
973                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
974                                   ctdb_db->db_name));
975                 talloc_free(ctdb_db);
976                 return -1;
977         }
978
979         DLIST_ADD(ctdb->db_list, ctdb_db);
980
981         /* setting this can help some high churn databases */
982         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
983
984         /* 
985            all databases support the "null" function. we need this in
986            order to do forced migration of records
987         */
988         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
989         if (ret != 0) {
990                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
991                 talloc_free(ctdb_db);
992                 return -1;
993         }
994
995         /* 
996            all databases support the "fetch" function. we need this
997            for efficient Samba3 ctdb fetch
998         */
999         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1000         if (ret != 0) {
1001                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1002                 talloc_free(ctdb_db);
1003                 return -1;
1004         }
1005
1006         /* 
1007            all databases support the "fetch_with_header" function. we need this
1008            for efficient readonly record fetches
1009         */
1010         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1011         if (ret != 0) {
1012                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1013                 talloc_free(ctdb_db);
1014                 return -1;
1015         }
1016
1017         ret = ctdb_vacuum_init(ctdb_db);
1018         if (ret != 0) {
1019                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1020                                   "database '%s'\n", ctdb_db->db_name));
1021                 talloc_free(ctdb_db);
1022                 return -1;
1023         }
1024
1025         ctdb_db->generation = ctdb->vnn_map->generation;
1026
1027         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1028                             ctdb_db->db_path, tdb_flags));
1029
1030         /* success */
1031         return 0;
1032 }
1033
1034
1035 struct ctdb_deferred_attach_context {
1036         struct ctdb_deferred_attach_context *next, *prev;
1037         struct ctdb_context *ctdb;
1038         struct ctdb_req_control_old *c;
1039 };
1040
1041
1042 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1043 {
1044         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1045
1046         return 0;
1047 }
1048
1049 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1050                                          struct tevent_timer *te,
1051                                          struct timeval t, void *private_data)
1052 {
1053         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1054         struct ctdb_context *ctdb = da_ctx->ctdb;
1055
1056         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1057         talloc_free(da_ctx);
1058 }
1059
1060 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1061                                           struct tevent_timer *te,
1062                                           struct timeval t, void *private_data)
1063 {
1064         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1065         struct ctdb_context *ctdb = da_ctx->ctdb;
1066
1067         /* This talloc-steals the packet ->c */
1068         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1069         talloc_free(da_ctx);
1070 }
1071
1072 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1073 {
1074         struct ctdb_deferred_attach_context *da_ctx;
1075
1076         /* call it from the main event loop as soon as the current event 
1077            finishes.
1078          */
1079         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1080                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1081                 tevent_add_timer(ctdb->ev, da_ctx,
1082                                  timeval_current_ofs(1,0),
1083                                  ctdb_deferred_attach_callback, da_ctx);
1084         }
1085
1086         return 0;
1087 }
1088
1089 /*
1090   a client has asked to attach a new database
1091  */
1092 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1093                                TDB_DATA *outdata, uint64_t tdb_flags, 
1094                                bool persistent, uint32_t client_id,
1095                                struct ctdb_req_control_old *c,
1096                                bool *async_reply)
1097 {
1098         const char *db_name = (const char *)indata.dptr;
1099         struct ctdb_db_context *db;
1100         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1101         struct ctdb_client *client = NULL;
1102         bool with_jenkinshash, with_mutexes;
1103
1104         if (ctdb->tunable.allow_client_db_attach == 0) {
1105                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1106                                   "AllowClientDBAccess == 0\n", db_name));
1107                 return -1;
1108         }
1109
1110         /* don't allow any local clients to attach while we are in recovery mode
1111          * except for the recovery daemon.
1112          * allow all attach from the network since these are always from remote
1113          * recovery daemons.
1114          */
1115         if (client_id != 0) {
1116                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1117         }
1118         if (client != NULL) {
1119                 /* If the node is inactive it is not part of the cluster
1120                    and we should not allow clients to attach to any
1121                    databases
1122                 */
1123                 if (node->flags & NODE_FLAGS_INACTIVE) {
1124                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1125                         return -1;
1126                 }
1127
1128                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1129                     client->pid != ctdb->recoverd_pid &&
1130                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1131                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1132
1133                         if (da_ctx == NULL) {
1134                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1135                                 return -1;
1136                         }
1137
1138                         da_ctx->ctdb = ctdb;
1139                         da_ctx->c = talloc_steal(da_ctx, c);
1140                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1141                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1142
1143                         tevent_add_timer(ctdb->ev, da_ctx,
1144                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1145                                          ctdb_deferred_attach_timeout, da_ctx);
1146
1147                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1148                         *async_reply = true;
1149                         return 0;
1150                 }
1151         }
1152
1153         /* the client can optionally pass additional tdb flags, but we
1154            only allow a subset of those on the database in ctdb. Note
1155            that tdb_flags is passed in via the (otherwise unused)
1156            srvid to the attach control */
1157 #ifdef TDB_MUTEX_LOCKING
1158         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1159 #else
1160         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1161 #endif
1162
1163         /* see if we already have this name */
1164         db = ctdb_db_handle(ctdb, db_name);
1165         if (db) {
1166                 if (db->persistent != persistent) {
1167                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1168                                           "database %s\n", persistent ? "" : "non-",
1169                                           db-> persistent ? "" : "non-", db_name));
1170                         return -1;
1171                 }
1172                 outdata->dptr  = (uint8_t *)&db->db_id;
1173                 outdata->dsize = sizeof(db->db_id);
1174                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1175                 return 0;
1176         }
1177
1178         with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1179 #ifdef TDB_MUTEX_LOCKING
1180         with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1181 #else
1182         with_mutexes = false;
1183 #endif
1184
1185         if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1186                               with_jenkinshash, with_mutexes) != 0) {
1187                 return -1;
1188         }
1189
1190         db = ctdb_db_handle(ctdb, db_name);
1191         if (!db) {
1192                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1193                 return -1;
1194         }
1195
1196         /* remember the flags the client has specified */
1197         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1198
1199         outdata->dptr  = (uint8_t *)&db->db_id;
1200         outdata->dsize = sizeof(db->db_id);
1201
1202         /* Try to ensure it's locked in mem */
1203         lockdown_memory(ctdb->valgrinding);
1204
1205         /* tell all the other nodes about this database */
1206         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1207                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1208                                                 CTDB_CONTROL_DB_ATTACH,
1209                                  0, CTDB_CTRL_FLAG_NOREPLY,
1210                                  indata, NULL, NULL);
1211
1212         /* success */
1213         return 0;
1214 }
1215
1216 /*
1217  * a client has asked to detach from a database
1218  */
1219 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1220                                uint32_t client_id)
1221 {
1222         uint32_t db_id;
1223         struct ctdb_db_context *ctdb_db;
1224         struct ctdb_client *client = NULL;
1225
1226         db_id = *(uint32_t *)indata.dptr;
1227         ctdb_db = find_ctdb_db(ctdb, db_id);
1228         if (ctdb_db == NULL) {
1229                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1230                                   db_id));
1231                 return -1;
1232         }
1233
1234         if (ctdb->tunable.allow_client_db_attach == 1) {
1235                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1236                                   "Clients are allowed access to databases "
1237                                   "(AllowClientDBAccess == 1)\n",
1238                                   ctdb_db->db_name));
1239                 return -1;
1240         }
1241
1242         if (ctdb_db->persistent) {
1243                 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1244                                   "denied\n", ctdb_db->db_name));
1245                 return -1;
1246         }
1247
1248         /* Cannot detach from database when in recovery */
1249         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1250                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1251                 return -1;
1252         }
1253
1254         /* If a control comes from a client, then broadcast it to all nodes.
1255          * Do the actual detach only if the control comes from other daemons.
1256          */
1257         if (client_id != 0) {
1258                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1259                 if (client != NULL) {
1260                         /* forward the control to all the nodes */
1261                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1262                                                  CTDB_CONTROL_DB_DETACH, 0,
1263                                                  CTDB_CTRL_FLAG_NOREPLY,
1264                                                  indata, NULL, NULL);
1265                         return 0;
1266                 }
1267                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1268                                   "for database '%s'\n", ctdb_db->db_name));
1269                 return -1;
1270         }
1271
1272         /* Detach database from recoverd */
1273         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1274                                      CTDB_SRVID_DETACH_DATABASE,
1275                                      indata) != 0) {
1276                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1277                 return -1;
1278         }
1279
1280         /* Disable vacuuming and drop all vacuuming data */
1281         talloc_free(ctdb_db->vacuum_handle);
1282         talloc_free(ctdb_db->delete_queue);
1283
1284         /* Terminate any deferred fetch */
1285         talloc_free(ctdb_db->deferred_fetch);
1286
1287         /* Terminate any traverses */
1288         while (ctdb_db->traverse) {
1289                 talloc_free(ctdb_db->traverse);
1290         }
1291
1292         /* Terminate any revokes */
1293         while (ctdb_db->revokechild_active) {
1294                 talloc_free(ctdb_db->revokechild_active);
1295         }
1296
1297         /* Free readonly tracking database */
1298         if (ctdb_db->readonly) {
1299                 talloc_free(ctdb_db->rottdb);
1300         }
1301
1302         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1303
1304         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1305                              ctdb_db->db_name));
1306         talloc_free(ctdb_db);
1307
1308         return 0;
1309 }
1310
1311 /*
1312   attach to all existing persistent databases
1313  */
1314 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1315                                   const char *unhealthy_reason)
1316 {
1317         DIR *d;
1318         struct dirent *de;
1319
1320         /* open the persistent db directory and scan it for files */
1321         d = opendir(ctdb->db_directory_persistent);
1322         if (d == NULL) {
1323                 return 0;
1324         }
1325
1326         while ((de=readdir(d))) {
1327                 char *p, *s, *q;
1328                 size_t len = strlen(de->d_name);
1329                 uint32_t node;
1330                 int invalid_name = 0;
1331                 
1332                 s = talloc_strdup(ctdb, de->d_name);
1333                 if (s == NULL) {
1334                         closedir(d);
1335                         CTDB_NO_MEMORY(ctdb, s);
1336                 }
1337
1338                 /* only accept names ending in .tdb */
1339                 p = strstr(s, ".tdb.");
1340                 if (len < 7 || p == NULL) {
1341                         talloc_free(s);
1342                         continue;
1343                 }
1344
1345                 /* only accept names ending with .tdb. and any number of digits */
1346                 q = p+5;
1347                 while (*q != 0 && invalid_name == 0) {
1348                         if (!isdigit(*q++)) {
1349                                 invalid_name = 1;
1350                         }
1351                 }
1352                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1353                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1354                         talloc_free(s);
1355                         continue;
1356                 }
1357                 p[4] = 0;
1358
1359                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1360                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1361                         closedir(d);
1362                         talloc_free(s);
1363                         return -1;
1364                 }
1365
1366                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1367
1368                 talloc_free(s);
1369         }
1370         closedir(d);
1371         return 0;
1372 }
1373
1374 int ctdb_attach_databases(struct ctdb_context *ctdb)
1375 {
1376         int ret;
1377         char *persistent_health_path = NULL;
1378         char *unhealthy_reason = NULL;
1379         bool first_try = true;
1380
1381         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1382                                                  ctdb->db_directory_state,
1383                                                  PERSISTENT_HEALTH_TDB,
1384                                                  ctdb->pnn);
1385         if (persistent_health_path == NULL) {
1386                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1387                 return -1;
1388         }
1389
1390 again:
1391
1392         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1393                                                    0, TDB_DISALLOW_NESTING,
1394                                                    O_CREAT | O_RDWR, 0600);
1395         if (ctdb->db_persistent_health == NULL) {
1396                 struct tdb_wrap *tdb;
1397
1398                 if (!first_try) {
1399                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1400                                           persistent_health_path,
1401                                           errno,
1402                                           strerror(errno)));
1403                         talloc_free(persistent_health_path);
1404                         talloc_free(unhealthy_reason);
1405                         return -1;
1406                 }
1407                 first_try = false;
1408
1409                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1410                                                    persistent_health_path,
1411                                                    "was cleared after a failure",
1412                                                    "manual verification needed");
1413                 if (unhealthy_reason == NULL) {
1414                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1415                         talloc_free(persistent_health_path);
1416                         return -1;
1417                 }
1418
1419                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1420                                   persistent_health_path));
1421                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1422                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1423                                     O_CREAT | O_RDWR, 0600);
1424                 if (tdb) {
1425                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1426                                           persistent_health_path,
1427                                           errno,
1428                                           strerror(errno)));
1429                         talloc_free(persistent_health_path);
1430                         talloc_free(unhealthy_reason);
1431                         return -1;
1432                 }
1433
1434                 talloc_free(tdb);
1435                 goto again;
1436         }
1437         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1438         if (ret != 0) {
1439                 struct tdb_wrap *tdb;
1440
1441                 talloc_free(ctdb->db_persistent_health);
1442                 ctdb->db_persistent_health = NULL;
1443
1444                 if (!first_try) {
1445                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1446                                           persistent_health_path));
1447                         talloc_free(persistent_health_path);
1448                         talloc_free(unhealthy_reason);
1449                         return -1;
1450                 }
1451                 first_try = false;
1452
1453                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1454                                                    persistent_health_path,
1455                                                    "was cleared after a failure",
1456                                                    "manual verification needed");
1457                 if (unhealthy_reason == NULL) {
1458                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1459                         talloc_free(persistent_health_path);
1460                         return -1;
1461                 }
1462
1463                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1464                                   persistent_health_path));
1465                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1466                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1467                                     O_CREAT | O_RDWR, 0600);
1468                 if (tdb) {
1469                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1470                                           persistent_health_path,
1471                                           errno,
1472                                           strerror(errno)));
1473                         talloc_free(persistent_health_path);
1474                         talloc_free(unhealthy_reason);
1475                         return -1;
1476                 }
1477
1478                 talloc_free(tdb);
1479                 goto again;
1480         }
1481         talloc_free(persistent_health_path);
1482
1483         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1484         talloc_free(unhealthy_reason);
1485         if (ret != 0) {
1486                 return ret;
1487         }
1488
1489         return 0;
1490 }
1491
1492 /*
1493   called when a broadcast seqnum update comes in
1494  */
1495 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1496 {
1497         struct ctdb_db_context *ctdb_db;
1498         if (srcnode == ctdb->pnn) {
1499                 /* don't update ourselves! */
1500                 return 0;
1501         }
1502
1503         ctdb_db = find_ctdb_db(ctdb, db_id);
1504         if (!ctdb_db) {
1505                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1506                 return -1;
1507         }
1508
1509         if (ctdb_db->unhealthy_reason) {
1510                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1511                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1512                 return -1;
1513         }
1514
1515         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1516         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1517         return 0;
1518 }
1519
1520 /*
1521   timer to check for seqnum changes in a ltdb and propogate them
1522  */
1523 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1524                                    struct tevent_timer *te,
1525                                    struct timeval t, void *p)
1526 {
1527         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1528         struct ctdb_context *ctdb = ctdb_db->ctdb;
1529         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1530         if (new_seqnum != ctdb_db->seqnum) {
1531                 /* something has changed - propogate it */
1532                 TDB_DATA data;
1533                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1534                 data.dsize = sizeof(uint32_t);
1535                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1536                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1537                                          data, NULL, NULL);             
1538         }
1539         ctdb_db->seqnum = new_seqnum;
1540
1541         /* setup a new timer */
1542         ctdb_db->seqnum_update =
1543                 tevent_add_timer(ctdb->ev, ctdb_db,
1544                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1545                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1546                                  ctdb_ltdb_seqnum_check, ctdb_db);
1547 }
1548
1549 /*
1550   enable seqnum handling on this db
1551  */
1552 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1553 {
1554         struct ctdb_db_context *ctdb_db;
1555         ctdb_db = find_ctdb_db(ctdb, db_id);
1556         if (!ctdb_db) {
1557                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1558                 return -1;
1559         }
1560
1561         if (ctdb_db->seqnum_update == NULL) {
1562                 ctdb_db->seqnum_update = tevent_add_timer(
1563                         ctdb->ev, ctdb_db,
1564                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1565                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1566                         ctdb_ltdb_seqnum_check, ctdb_db);
1567         }
1568
1569         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1570         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1571         return 0;
1572 }
1573
1574 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1575 {
1576         if (ctdb_db->sticky) {
1577                 return 0;
1578         }
1579
1580         if (ctdb_db->persistent) {
1581                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1582                 return -1;
1583         }
1584
1585         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1586
1587         ctdb_db->sticky = true;
1588
1589         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1590
1591         return 0;
1592 }
1593
1594 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1595 {
1596         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1597         int i;
1598
1599         for (i=0; i<MAX_HOT_KEYS; i++) {
1600                 if (s->hot_keys[i].key.dsize > 0) {
1601                         talloc_free(s->hot_keys[i].key.dptr);
1602                 }
1603         }
1604
1605         ZERO_STRUCT(ctdb_db->statistics);
1606 }
1607
1608 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1609                                 uint32_t db_id,
1610                                 TDB_DATA *outdata)
1611 {
1612         struct ctdb_db_context *ctdb_db;
1613         struct ctdb_db_statistics_old *stats;
1614         int i;
1615         int len;
1616         char *ptr;
1617
1618         ctdb_db = find_ctdb_db(ctdb, db_id);
1619         if (!ctdb_db) {
1620                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1621                 return -1;
1622         }
1623
1624         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1625         for (i = 0; i < MAX_HOT_KEYS; i++) {
1626                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1627         }
1628
1629         stats = talloc_size(outdata, len);
1630         if (stats == NULL) {
1631                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1632                 return -1;
1633         }
1634
1635         memcpy(stats, &ctdb_db->statistics,
1636                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1637
1638         stats->num_hot_keys = MAX_HOT_KEYS;
1639
1640         ptr = &stats->hot_keys_wire[0];
1641         for (i = 0; i < MAX_HOT_KEYS; i++) {
1642                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1643                        ctdb_db->statistics.hot_keys[i].key.dsize);
1644                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1645         }
1646
1647         outdata->dptr  = (uint8_t *)stats;
1648         outdata->dsize = len;
1649
1650         return 0;
1651 }