9ac2217a34a8251d6997e758c64fbc3ae18898fd
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /**
35  * write a record to a normal database
36  *
37  * This is the server-variant of the ctdb_ltdb_store function.
38  * It contains logic to determine whether a record should be
39  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
40  * controls to the local ctdb daemon if apporpriate.
41  */
42 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
43                                   TDB_DATA key,
44                                   struct ctdb_ltdb_header *header,
45                                   TDB_DATA data)
46 {
47         struct ctdb_context *ctdb = ctdb_db->ctdb;
48         TDB_DATA rec;
49         int ret;
50         bool seqnum_suppressed = false;
51         bool keep = false;
52         bool schedule_for_deletion = false;
53         bool remove_from_delete_queue = false;
54         uint32_t lmaster;
55
56         if (ctdb->flags & CTDB_FLAG_TORTURE) {
57                 struct ctdb_ltdb_header *h2;
58                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
59                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
60                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
61                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
62                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
63                 }
64                 if (rec.dptr) free(rec.dptr);
65         }
66
67         if (ctdb->vnn_map == NULL) {
68                 /*
69                  * Called from a client: always store the record
70                  * Also don't call ctdb_lmaster since it uses the vnn_map!
71                  */
72                 keep = true;
73                 goto store;
74         }
75
76         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
77
78         /*
79          * If we migrate an empty record off to another node
80          * and the record has not been migrated with data,
81          * delete the record instead of storing the empty record.
82          */
83         if (data.dsize != 0) {
84                 keep = true;
85         } else if (header->flags & CTDB_REC_RO_FLAGS) {
86                 keep = true;
87         } else if (ctdb_db->persistent) {
88                 keep = true;
89         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
90                 /*
91                  * The record is not created by the client but
92                  * automatically by the ctdb_ltdb_fetch logic that
93                  * creates a record with an initial header in the
94                  * ltdb before trying to migrate the record from
95                  * the current lmaster. Keep it instead of trying
96                  * to delete the non-existing record...
97                  */
98                 keep = true;
99                 schedule_for_deletion = true;
100         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
101                 keep = true;
102         } else if (ctdb_db->ctdb->pnn == lmaster) {
103                 /*
104                  * If we are lmaster, then we usually keep the record.
105                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
106                  * and the record is empty and has never been migrated
107                  * with data, then we should delete it instead of storing it.
108                  * This is part of the vacuuming process.
109                  *
110                  * The reason that we usually need to store even empty records
111                  * on the lmaster is that a client operating directly on the
112                  * lmaster (== dmaster) expects the local copy of the record to
113                  * exist after successful ctdb migrate call. If the record does
114                  * not exist, the client goes into a migrate loop and eventually
115                  * fails. So storing the empty record makes sure that we do not
116                  * need to change the client code.
117                  */
118                 if ((header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) &&
119                     (ctdb_db->ctdb->pnn == header->dmaster)) {
120                         keep = true;
121                         schedule_for_deletion = true;
122                 }
123                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
124                         keep = true;
125                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
126                         keep = true;
127                 }
128         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
129                 keep = true;
130         }
131
132         if (keep) {
133                 if (!ctdb_db->persistent &&
134                     (ctdb_db->ctdb->pnn == header->dmaster) &&
135                     !(header->flags & CTDB_REC_RO_FLAGS))
136                 {
137                         header->rsn++;
138
139                         if (data.dsize == 0) {
140                                 schedule_for_deletion = true;
141                         }
142                 }
143                 remove_from_delete_queue = !schedule_for_deletion;
144         }
145
146 store:
147         /*
148          * The VACUUM_MIGRATED flag is only set temporarily for
149          * the above logic when the record was retrieved by a
150          * VACUUM_MIGRATE call and should not be stored in the
151          * database.
152          *
153          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
154          * and there are two cases in which the corresponding record
155          * is stored in the local database:
156          * 1. The record has been migrated with data in the past
157          *    (the MIGRATED_WITH_DATA record flag is set).
158          * 2. The record has been filled with data again since it
159          *    had been submitted in the VACUUM_FETCH message to the
160          *    lmaster.
161          * For such records it is important to not store the
162          * VACUUM_MIGRATED flag in the database.
163          */
164         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
165
166         /*
167          * Similarly, clear the AUTOMATIC flag which should not enter
168          * the local database copy since this would require client
169          * modifications to clear the flag when the client stores
170          * the record.
171          */
172         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
173
174         rec.dsize = sizeof(*header) + data.dsize;
175         rec.dptr = talloc_size(ctdb, rec.dsize);
176         CTDB_NO_MEMORY(ctdb, rec.dptr);
177
178         memcpy(rec.dptr, header, sizeof(*header));
179         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
180
181         /* Databases with seqnum updates enabled only get their seqnum
182            changes when/if we modify the data */
183         if (ctdb_db->seqnum_update != NULL) {
184                 TDB_DATA old;
185                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
186
187                 if ( (old.dsize == rec.dsize)
188                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
189                           rec.dptr+sizeof(struct ctdb_ltdb_header),
190                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
191                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
192                         seqnum_suppressed = true;
193                 }
194                 if (old.dptr) free(old.dptr);
195         }
196
197         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
198                             ctdb_db->db_name,
199                             keep?"storing":"deleting",
200                             ctdb_hash(&key)));
201
202         if (keep) {
203                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
204         } else {
205                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
206         }
207
208         if (ret != 0) {
209                 int lvl = DEBUG_ERR;
210
211                 if (keep == false &&
212                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
213                 {
214                         lvl = DEBUG_DEBUG;
215                 }
216
217                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
218                             "%d - %s\n",
219                             ctdb_db->db_name,
220                             keep?"store":"delete", ret,
221                             tdb_errorstr(ctdb_db->ltdb->tdb)));
222
223                 schedule_for_deletion = false;
224                 remove_from_delete_queue = false;
225         }
226         if (seqnum_suppressed) {
227                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
228         }
229
230         talloc_free(rec.dptr);
231
232         if (schedule_for_deletion) {
233                 int ret2;
234                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
235                 if (ret2 != 0) {
236                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
237                 }
238         }
239
240         if (remove_from_delete_queue) {
241                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
242         }
243
244         return ret;
245 }
246
247 struct lock_fetch_state {
248         struct ctdb_context *ctdb;
249         void (*recv_pkt)(void *, struct ctdb_req_header *);
250         void *recv_context;
251         struct ctdb_req_header *hdr;
252         uint32_t generation;
253         bool ignore_generation;
254 };
255
256 /*
257   called when we should retry the operation
258  */
259 static void lock_fetch_callback(void *p, bool locked)
260 {
261         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
262         if (!state->ignore_generation &&
263             state->generation != state->ctdb->vnn_map->generation) {
264                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
265                 talloc_free(state->hdr);
266                 return;
267         }
268         state->recv_pkt(state->recv_context, state->hdr);
269         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
270 }
271
272
273 /*
274   do a non-blocking ltdb_lock, deferring this ctdb request until we
275   have the chainlock
276
277   It does the following:
278
279    1) tries to get the chainlock. If it succeeds, then it returns 0
280
281    2) if it fails to get a chainlock immediately then it sets up a
282    non-blocking chainlock via ctdb_lock_record, and when it gets the
283    chainlock it re-submits this ctdb request to the main packet
284    receive function.
285
286    This effectively queues all ctdb requests that cannot be
287    immediately satisfied until it can get the lock. This means that
288    the main ctdb daemon will not block waiting for a chainlock held by
289    a client
290
291    There are 3 possible return values:
292
293        0:    means that it got the lock immediately.
294       -1:    means that it failed to get the lock, and won't retry
295       -2:    means that it failed to get the lock immediately, but will retry
296  */
297 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
298                            TDB_DATA key, struct ctdb_req_header *hdr,
299                            void (*recv_pkt)(void *, struct ctdb_req_header *),
300                            void *recv_context, bool ignore_generation)
301 {
302         int ret;
303         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
304         struct lock_request *lreq;
305         struct lock_fetch_state *state;
306         
307         ret = tdb_chainlock_nonblock(tdb, key);
308
309         if (ret != 0 &&
310             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
311                 /* a hard failure - don't try again */
312                 return -1;
313         }
314
315         /* when torturing, ensure we test the contended path */
316         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
317             random() % 5 == 0) {
318                 ret = -1;
319                 tdb_chainunlock(tdb, key);
320         }
321
322         /* first the non-contended path */
323         if (ret == 0) {
324                 return 0;
325         }
326
327         state = talloc(hdr, struct lock_fetch_state);
328         state->ctdb = ctdb_db->ctdb;
329         state->hdr = hdr;
330         state->recv_pkt = recv_pkt;
331         state->recv_context = recv_context;
332         state->generation = ctdb_db->ctdb->vnn_map->generation;
333         state->ignore_generation = ignore_generation;
334
335         /* now the contended path */
336         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
337         if (lreq == NULL) {
338                 return -1;
339         }
340
341         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
342            so it won't be freed yet */
343         talloc_steal(state, hdr);
344
345         /* now tell the caller than we will retry asynchronously */
346         return -2;
347 }
348
349 /*
350   a varient of ctdb_ltdb_lock_requeue that also fetches the record
351  */
352 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
353                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
354                                  struct ctdb_req_header *hdr, TDB_DATA *data,
355                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
356                                  void *recv_context, bool ignore_generation)
357 {
358         int ret;
359
360         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
361                                      recv_context, ignore_generation);
362         if (ret == 0) {
363                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
364                 if (ret != 0) {
365                         int uret;
366                         uret = ctdb_ltdb_unlock(ctdb_db, key);
367                         if (uret != 0) {
368                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
369                         }
370                 }
371         }
372         return ret;
373 }
374
375
376 /*
377   paraoid check to see if the db is empty
378  */
379 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
380 {
381         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
382         int count = tdb_traverse_read(tdb, NULL, NULL);
383         if (count != 0) {
384                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
385                          ctdb_db->db_path));
386                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
387         }
388 }
389
390 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
391                                 struct ctdb_db_context *ctdb_db)
392 {
393         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
394         char *old;
395         char *reason = NULL;
396         TDB_DATA key;
397         TDB_DATA val;
398
399         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
400         key.dsize = strlen(ctdb_db->db_name);
401
402         old = ctdb_db->unhealthy_reason;
403         ctdb_db->unhealthy_reason = NULL;
404
405         val = tdb_fetch(tdb, key);
406         if (val.dsize > 0) {
407                 reason = talloc_strndup(ctdb_db,
408                                         (const char *)val.dptr,
409                                         val.dsize);
410                 if (reason == NULL) {
411                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
412                                            (int)val.dsize));
413                         ctdb_db->unhealthy_reason = old;
414                         free(val.dptr);
415                         return -1;
416                 }
417         }
418
419         if (val.dptr) {
420                 free(val.dptr);
421         }
422
423         talloc_free(old);
424         ctdb_db->unhealthy_reason = reason;
425         return 0;
426 }
427
428 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
429                                   struct ctdb_db_context *ctdb_db,
430                                   const char *given_reason,/* NULL means healthy */
431                                   int num_healthy_nodes)
432 {
433         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
434         int ret;
435         TDB_DATA key;
436         TDB_DATA val;
437         char *new_reason = NULL;
438         char *old_reason = NULL;
439
440         ret = tdb_transaction_start(tdb);
441         if (ret != 0) {
442                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
443                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
444                 return -1;
445         }
446
447         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
448         if (ret != 0) {
449                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
450                                    ctdb_db->db_name, ret));
451                 return -1;
452         }
453         old_reason = ctdb_db->unhealthy_reason;
454
455         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
456         key.dsize = strlen(ctdb_db->db_name);
457
458         if (given_reason) {
459                 new_reason = talloc_strdup(ctdb_db, given_reason);
460                 if (new_reason == NULL) {
461                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
462                                           given_reason));
463                         return -1;
464                 }
465         } else if (old_reason && num_healthy_nodes == 0) {
466                 /*
467                  * If the reason indicates ok, but there where no healthy nodes
468                  * available, that it means, we have not recovered valid content
469                  * of the db. So if there's an old reason, prefix it with
470                  * "NO-HEALTHY-NODES - "
471                  */
472                 const char *prefix;
473
474 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
475                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
476                 if (ret != 0) {
477                         prefix = _TMP_PREFIX;
478                 } else {
479                         prefix = "";
480                 }
481                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
482                                          prefix, old_reason);
483                 if (new_reason == NULL) {
484                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
485                                           prefix, old_reason));
486                         return -1;
487                 }
488 #undef _TMP_PREFIX
489         }
490
491         if (new_reason) {
492                 val.dptr = discard_const_p(uint8_t, new_reason);
493                 val.dsize = strlen(new_reason);
494
495                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
496                 if (ret != 0) {
497                         tdb_transaction_cancel(tdb);
498                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
499                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
500                                            ret, tdb_errorstr(tdb)));
501                         talloc_free(new_reason);
502                         return -1;
503                 }
504                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
505                                    ctdb_db->db_name, new_reason));
506         } else if (old_reason) {
507                 ret = tdb_delete(tdb, key);
508                 if (ret != 0) {
509                         tdb_transaction_cancel(tdb);
510                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
511                                            tdb_name(tdb), ctdb_db->db_name,
512                                            ret, tdb_errorstr(tdb)));
513                         talloc_free(new_reason);
514                         return -1;
515                 }
516                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
517                                    ctdb_db->db_name));
518         }
519
520         ret = tdb_transaction_commit(tdb);
521         if (ret != TDB_SUCCESS) {
522                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
523                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
524                 talloc_free(new_reason);
525                 return -1;
526         }
527
528         talloc_free(old_reason);
529         ctdb_db->unhealthy_reason = new_reason;
530
531         return 0;
532 }
533
534 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
535                                      struct ctdb_db_context *ctdb_db)
536 {
537         time_t now = time(NULL);
538         char *new_path;
539         char *new_reason;
540         int ret;
541         struct tm *tm;
542
543         tm = gmtime(&now);
544
545         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
546         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
547                                    "%04u%02u%02u%02u%02u%02u.0Z",
548                                    ctdb_db->db_path,
549                                    tm->tm_year+1900, tm->tm_mon+1,
550                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
551                                    tm->tm_sec);
552         if (new_path == NULL) {
553                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
554                 return -1;
555         }
556
557         new_reason = talloc_asprintf(ctdb_db,
558                                      "ERROR - Backup of corrupted TDB in '%s'",
559                                      new_path);
560         if (new_reason == NULL) {
561                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
562                 return -1;
563         }
564         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
565         talloc_free(new_reason);
566         if (ret != 0) {
567                 DEBUG(DEBUG_CRIT,(__location__
568                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
569                                  ctdb_db->db_path));
570                 return -1;
571         }
572
573         ret = rename(ctdb_db->db_path, new_path);
574         if (ret != 0) {
575                 DEBUG(DEBUG_CRIT,(__location__
576                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
577                                   ctdb_db->db_path, new_path,
578                                   errno, strerror(errno)));
579                 talloc_free(new_path);
580                 return -1;
581         }
582
583         DEBUG(DEBUG_CRIT,(__location__
584                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
585                          ctdb_db->db_path, new_path));
586         talloc_free(new_path);
587         return 0;
588 }
589
590 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
591 {
592         struct ctdb_db_context *ctdb_db;
593         int ret;
594         int ok = 0;
595         int fail = 0;
596
597         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
598                 if (!ctdb_db->persistent) {
599                         continue;
600                 }
601
602                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
603                 if (ret != 0) {
604                         DEBUG(DEBUG_ALERT,(__location__
605                                            " load persistent health for '%s' failed\n",
606                                            ctdb_db->db_path));
607                         return -1;
608                 }
609
610                 if (ctdb_db->unhealthy_reason == NULL) {
611                         ok++;
612                         DEBUG(DEBUG_INFO,(__location__
613                                    " persistent db '%s' healthy\n",
614                                    ctdb_db->db_path));
615                         continue;
616                 }
617
618                 fail++;
619                 DEBUG(DEBUG_ALERT,(__location__
620                                    " persistent db '%s' unhealthy: %s\n",
621                                    ctdb_db->db_path,
622                                    ctdb_db->unhealthy_reason));
623         }
624         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
625               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
626                ok, fail));
627
628         if (fail != 0) {
629                 return -1;
630         }
631
632         return 0;
633 }
634
635
636 /*
637   mark a database - as healthy
638  */
639 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
640 {
641         uint32_t db_id = *(uint32_t *)indata.dptr;
642         struct ctdb_db_context *ctdb_db;
643         int ret;
644         bool may_recover = false;
645
646         ctdb_db = find_ctdb_db(ctdb, db_id);
647         if (!ctdb_db) {
648                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
649                 return -1;
650         }
651
652         if (ctdb_db->unhealthy_reason) {
653                 may_recover = true;
654         }
655
656         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
657         if (ret != 0) {
658                 DEBUG(DEBUG_ERR,(__location__
659                                  " ctdb_update_persistent_health(%s) failed\n",
660                                  ctdb_db->db_name));
661                 return -1;
662         }
663
664         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
665                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
666                                   ctdb_db->db_name));
667                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
668         }
669
670         return 0;
671 }
672
673 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
674                                    TDB_DATA indata,
675                                    TDB_DATA *outdata)
676 {
677         uint32_t db_id = *(uint32_t *)indata.dptr;
678         struct ctdb_db_context *ctdb_db;
679         int ret;
680
681         ctdb_db = find_ctdb_db(ctdb, db_id);
682         if (!ctdb_db) {
683                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
684                 return -1;
685         }
686
687         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
688         if (ret != 0) {
689                 DEBUG(DEBUG_ERR,(__location__
690                                  " ctdb_load_persistent_health(%s) failed\n",
691                                  ctdb_db->db_name));
692                 return -1;
693         }
694
695         *outdata = tdb_null;
696         if (ctdb_db->unhealthy_reason) {
697                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
698                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
699         }
700
701         return 0;
702 }
703
704
705 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
706 {
707         char *ropath;
708
709         if (ctdb_db->readonly) {
710                 return 0;
711         }
712
713         if (ctdb_db->persistent) {
714                 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
715                 return -1;
716         }
717
718         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
719         if (ropath == NULL) {
720                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
721                 return -1;
722         }
723         ctdb_db->rottdb = tdb_open(ropath, 
724                               ctdb->tunable.database_hash_size, 
725                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
726                               O_CREAT|O_RDWR, 0);
727         if (ctdb_db->rottdb == NULL) {
728                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
729                 talloc_free(ropath);
730                 return -1;
731         }
732
733         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
734
735         ctdb_db->readonly = true;
736
737         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
738
739         talloc_free(ropath);
740         return 0;
741 }
742
743 /*
744   attach to a database, handling both persistent and non-persistent databases
745   return 0 on success, -1 on failure
746  */
747 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
748                              bool persistent, const char *unhealthy_reason,
749                              bool jenkinshash, bool mutexes)
750 {
751         struct ctdb_db_context *ctdb_db, *tmp_db;
752         int ret;
753         struct TDB_DATA key;
754         unsigned tdb_flags;
755         int mode = 0600;
756         int remaining_tries = 0;
757
758         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
759         CTDB_NO_MEMORY(ctdb, ctdb_db);
760
761         ctdb_db->priority = 1;
762         ctdb_db->ctdb = ctdb;
763         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
764         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
765
766         key.dsize = strlen(db_name)+1;
767         key.dptr  = discard_const(db_name);
768         ctdb_db->db_id = ctdb_hash(&key);
769         ctdb_db->persistent = persistent;
770
771         if (!ctdb_db->persistent) {
772                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
773                 if (ctdb_db->delete_queue == NULL) {
774                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
775                 }
776
777                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
778         }
779
780         /* check for hash collisions */
781         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
782                 if (tmp_db->db_id == ctdb_db->db_id) {
783                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
784                                  tmp_db->db_id, db_name, tmp_db->db_name));
785                         talloc_free(ctdb_db);
786                         return -1;
787                 }
788         }
789
790         if (persistent) {
791                 if (unhealthy_reason) {
792                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
793                                                             unhealthy_reason, 0);
794                         if (ret != 0) {
795                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
796                                                    ctdb_db->db_name, unhealthy_reason, ret));
797                                 talloc_free(ctdb_db);
798                                 return -1;
799                         }
800                 }
801
802                 if (ctdb->max_persistent_check_errors > 0) {
803                         remaining_tries = 1;
804                 }
805                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
806                         remaining_tries = 0;
807                 }
808
809                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
810                 if (ret != 0) {
811                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
812                                    ctdb_db->db_name, ret));
813                         talloc_free(ctdb_db);
814                         return -1;
815                 }
816         }
817
818         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
819                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
820                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
821                 talloc_free(ctdb_db);
822                 return -1;
823         }
824
825         if (ctdb_db->unhealthy_reason) {
826                 /* this is just a warning, but we want that in the log file! */
827                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
828                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
829         }
830
831         /* open the database */
832         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
833                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
834                                            db_name, ctdb->pnn);
835
836         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
837         if (ctdb->valgrinding) {
838                 tdb_flags |= TDB_NOMMAP;
839         }
840         tdb_flags |= TDB_DISALLOW_NESTING;
841         if (jenkinshash) {
842                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
843         }
844 #ifdef TDB_MUTEX_LOCKING
845         if (ctdb->tunable.mutex_enabled && mutexes &&
846             tdb_runtime_check_for_robust_mutexes()) {
847                 tdb_flags |= TDB_MUTEX_LOCKING;
848         }
849 #endif
850
851 again:
852         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
853                                       ctdb->tunable.database_hash_size, 
854                                       tdb_flags, 
855                                       O_CREAT|O_RDWR, mode);
856         if (ctdb_db->ltdb == NULL) {
857                 struct stat st;
858                 int saved_errno = errno;
859
860                 if (!persistent) {
861                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
862                                           ctdb_db->db_path,
863                                           saved_errno,
864                                           strerror(saved_errno)));
865                         talloc_free(ctdb_db);
866                         return -1;
867                 }
868
869                 if (remaining_tries == 0) {
870                         DEBUG(DEBUG_CRIT,(__location__
871                                           "Failed to open persistent tdb '%s': %d - %s\n",
872                                           ctdb_db->db_path,
873                                           saved_errno,
874                                           strerror(saved_errno)));
875                         talloc_free(ctdb_db);
876                         return -1;
877                 }
878
879                 ret = stat(ctdb_db->db_path, &st);
880                 if (ret != 0) {
881                         DEBUG(DEBUG_CRIT,(__location__
882                                           "Failed to open persistent tdb '%s': %d - %s\n",
883                                           ctdb_db->db_path,
884                                           saved_errno,
885                                           strerror(saved_errno)));
886                         talloc_free(ctdb_db);
887                         return -1;
888                 }
889
890                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
891                 if (ret != 0) {
892                         DEBUG(DEBUG_CRIT,(__location__
893                                           "Failed to open persistent tdb '%s': %d - %s\n",
894                                           ctdb_db->db_path,
895                                           saved_errno,
896                                           strerror(saved_errno)));
897                         talloc_free(ctdb_db);
898                         return -1;
899                 }
900
901                 remaining_tries--;
902                 mode = st.st_mode;
903                 goto again;
904         }
905
906         if (!persistent) {
907                 ctdb_check_db_empty(ctdb_db);
908         } else {
909                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
910                 if (ret != 0) {
911                         int fd;
912                         struct stat st;
913
914                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
915                                           ctdb_db->db_path, ret,
916                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
917                         if (remaining_tries == 0) {
918                                 talloc_free(ctdb_db);
919                                 return -1;
920                         }
921
922                         fd = tdb_fd(ctdb_db->ltdb->tdb);
923                         ret = fstat(fd, &st);
924                         if (ret != 0) {
925                                 DEBUG(DEBUG_CRIT,(__location__
926                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
927                                                   ctdb_db->db_path,
928                                                   errno,
929                                                   strerror(errno)));
930                                 talloc_free(ctdb_db);
931                                 return -1;
932                         }
933
934                         /* close the TDB */
935                         talloc_free(ctdb_db->ltdb);
936                         ctdb_db->ltdb = NULL;
937
938                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
939                         if (ret != 0) {
940                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
941                                                   ctdb_db->db_path));
942                                 talloc_free(ctdb_db);
943                                 return -1;
944                         }
945
946                         remaining_tries--;
947                         mode = st.st_mode;
948                         goto again;
949                 }
950         }
951
952         /* set up a rb tree we can use to track which records we have a 
953            fetch-lock in-flight for so we can defer any additional calls
954            for the same record.
955          */
956         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
957         if (ctdb_db->deferred_fetch == NULL) {
958                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
959                 talloc_free(ctdb_db);
960                 return -1;
961         }
962
963         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
964         if (ctdb_db->defer_dmaster == NULL) {
965                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
966                                   ctdb_db->db_name));
967                 talloc_free(ctdb_db);
968                 return -1;
969         }
970
971         DLIST_ADD(ctdb->db_list, ctdb_db);
972
973         /* setting this can help some high churn databases */
974         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
975
976         /* 
977            all databases support the "null" function. we need this in
978            order to do forced migration of records
979         */
980         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
981         if (ret != 0) {
982                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
983                 talloc_free(ctdb_db);
984                 return -1;
985         }
986
987         /* 
988            all databases support the "fetch" function. we need this
989            for efficient Samba3 ctdb fetch
990         */
991         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
992         if (ret != 0) {
993                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
994                 talloc_free(ctdb_db);
995                 return -1;
996         }
997
998         /* 
999            all databases support the "fetch_with_header" function. we need this
1000            for efficient readonly record fetches
1001         */
1002         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1003         if (ret != 0) {
1004                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1005                 talloc_free(ctdb_db);
1006                 return -1;
1007         }
1008
1009         ret = ctdb_vacuum_init(ctdb_db);
1010         if (ret != 0) {
1011                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1012                                   "database '%s'\n", ctdb_db->db_name));
1013                 talloc_free(ctdb_db);
1014                 return -1;
1015         }
1016
1017
1018         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1019                             ctdb_db->db_path, tdb_flags));
1020
1021         /* success */
1022         return 0;
1023 }
1024
1025
1026 struct ctdb_deferred_attach_context {
1027         struct ctdb_deferred_attach_context *next, *prev;
1028         struct ctdb_context *ctdb;
1029         struct ctdb_req_control *c;
1030 };
1031
1032
1033 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1034 {
1035         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1036
1037         return 0;
1038 }
1039
1040 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1041 {
1042         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1043         struct ctdb_context *ctdb = da_ctx->ctdb;
1044
1045         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1046         talloc_free(da_ctx);
1047 }
1048
1049 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1050 {
1051         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1052         struct ctdb_context *ctdb = da_ctx->ctdb;
1053
1054         /* This talloc-steals the packet ->c */
1055         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1056         talloc_free(da_ctx);
1057 }
1058
1059 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1060 {
1061         struct ctdb_deferred_attach_context *da_ctx;
1062
1063         /* call it from the main event loop as soon as the current event 
1064            finishes.
1065          */
1066         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1067                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1068                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1069         }
1070
1071         return 0;
1072 }
1073
1074 /*
1075   a client has asked to attach a new database
1076  */
1077 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1078                                TDB_DATA *outdata, uint64_t tdb_flags, 
1079                                bool persistent, uint32_t client_id,
1080                                struct ctdb_req_control *c,
1081                                bool *async_reply)
1082 {
1083         const char *db_name = (const char *)indata.dptr;
1084         struct ctdb_db_context *db;
1085         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1086         struct ctdb_client *client = NULL;
1087         bool with_jenkinshash, with_mutexes;
1088
1089         if (ctdb->tunable.allow_client_db_attach == 0) {
1090                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1091                                   "AllowClientDBAccess == 0\n", db_name));
1092                 return -1;
1093         }
1094
1095         /* dont allow any local clients to attach while we are in recovery mode
1096          * except for the recovery daemon.
1097          * allow all attach from the network since these are always from remote
1098          * recovery daemons.
1099          */
1100         if (client_id != 0) {
1101                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1102         }
1103         if (client != NULL) {
1104                 /* If the node is inactive it is not part of the cluster
1105                    and we should not allow clients to attach to any
1106                    databases
1107                 */
1108                 if (node->flags & NODE_FLAGS_INACTIVE) {
1109                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1110                         return -1;
1111                 }
1112
1113                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1114                     client->pid != ctdb->recoverd_pid &&
1115                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1116                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1117
1118                         if (da_ctx == NULL) {
1119                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1120                                 return -1;
1121                         }
1122
1123                         da_ctx->ctdb = ctdb;
1124                         da_ctx->c = talloc_steal(da_ctx, c);
1125                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1126                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1127
1128                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1129
1130                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1131                         *async_reply = true;
1132                         return 0;
1133                 }
1134         }
1135
1136         /* the client can optionally pass additional tdb flags, but we
1137            only allow a subset of those on the database in ctdb. Note
1138            that tdb_flags is passed in via the (otherwise unused)
1139            srvid to the attach control */
1140 #ifdef TDB_MUTEX_LOCKING
1141         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING);
1142 #else
1143         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1144 #endif
1145
1146         /* see if we already have this name */
1147         db = ctdb_db_handle(ctdb, db_name);
1148         if (db) {
1149                 if (db->persistent != persistent) {
1150                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1151                                           "database %s\n", persistent ? "" : "non-",
1152                                           db-> persistent ? "" : "non-", db_name));
1153                         return -1;
1154                 }
1155                 outdata->dptr  = (uint8_t *)&db->db_id;
1156                 outdata->dsize = sizeof(db->db_id);
1157                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1158                 return 0;
1159         }
1160
1161         with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1162 #ifdef TDB_MUTEX_LOCKING
1163         with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1164 #else
1165         with_mutexes = false;
1166 #endif
1167
1168         if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1169                               with_jenkinshash, with_mutexes) != 0) {
1170                 return -1;
1171         }
1172
1173         db = ctdb_db_handle(ctdb, db_name);
1174         if (!db) {
1175                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1176                 return -1;
1177         }
1178
1179         /* remember the flags the client has specified */
1180         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1181
1182         outdata->dptr  = (uint8_t *)&db->db_id;
1183         outdata->dsize = sizeof(db->db_id);
1184
1185         /* Try to ensure it's locked in mem */
1186         lockdown_memory(ctdb->valgrinding);
1187
1188         /* tell all the other nodes about this database */
1189         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1190                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1191                                                 CTDB_CONTROL_DB_ATTACH,
1192                                  0, CTDB_CTRL_FLAG_NOREPLY,
1193                                  indata, NULL, NULL);
1194
1195         /* success */
1196         return 0;
1197 }
1198
1199 /*
1200  * a client has asked to detach from a database
1201  */
1202 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1203                                uint32_t client_id)
1204 {
1205         uint32_t db_id;
1206         struct ctdb_db_context *ctdb_db;
1207         struct ctdb_client *client = NULL;
1208
1209         db_id = *(uint32_t *)indata.dptr;
1210         ctdb_db = find_ctdb_db(ctdb, db_id);
1211         if (ctdb_db == NULL) {
1212                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1213                                   db_id));
1214                 return -1;
1215         }
1216
1217         if (ctdb->tunable.allow_client_db_attach == 1) {
1218                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1219                                   "Clients are allowed access to databases "
1220                                   "(AllowClientDBAccess == 1)\n",
1221                                   ctdb_db->db_name));
1222                 return -1;
1223         }
1224
1225         if (ctdb_db->persistent) {
1226                 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1227                                   "denied\n", ctdb_db->db_name));
1228                 return -1;
1229         }
1230
1231         /* Cannot detach from database when in recovery */
1232         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1233                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1234                 return -1;
1235         }
1236
1237         /* If a control comes from a client, then broadcast it to all nodes.
1238          * Do the actual detach only if the control comes from other daemons.
1239          */
1240         if (client_id != 0) {
1241                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1242                 if (client != NULL) {
1243                         /* forward the control to all the nodes */
1244                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1245                                                  CTDB_CONTROL_DB_DETACH, 0,
1246                                                  CTDB_CTRL_FLAG_NOREPLY,
1247                                                  indata, NULL, NULL);
1248                         return 0;
1249                 }
1250                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1251                                   "for database '%s'\n", ctdb_db->db_name));
1252                 return -1;
1253         }
1254
1255         /* Detach database from recoverd */
1256         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1257                                      CTDB_SRVID_DETACH_DATABASE,
1258                                      indata) != 0) {
1259                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1260                 return -1;
1261         }
1262
1263         /* Disable vacuuming and drop all vacuuming data */
1264         talloc_free(ctdb_db->vacuum_handle);
1265         talloc_free(ctdb_db->delete_queue);
1266
1267         /* Terminate any deferred fetch */
1268         talloc_free(ctdb_db->deferred_fetch);
1269
1270         /* Terminate any traverses */
1271         while (ctdb_db->traverse) {
1272                 talloc_free(ctdb_db->traverse);
1273         }
1274
1275         /* Terminate any revokes */
1276         while (ctdb_db->revokechild_active) {
1277                 talloc_free(ctdb_db->revokechild_active);
1278         }
1279
1280         /* Free readonly tracking database */
1281         if (ctdb_db->readonly) {
1282                 talloc_free(ctdb_db->rottdb);
1283         }
1284
1285         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1286
1287         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1288                              ctdb_db->db_name));
1289         talloc_free(ctdb_db);
1290
1291         return 0;
1292 }
1293
1294 /*
1295   attach to all existing persistent databases
1296  */
1297 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1298                                   const char *unhealthy_reason)
1299 {
1300         DIR *d;
1301         struct dirent *de;
1302
1303         /* open the persistent db directory and scan it for files */
1304         d = opendir(ctdb->db_directory_persistent);
1305         if (d == NULL) {
1306                 return 0;
1307         }
1308
1309         while ((de=readdir(d))) {
1310                 char *p, *s, *q;
1311                 size_t len = strlen(de->d_name);
1312                 uint32_t node;
1313                 int invalid_name = 0;
1314                 
1315                 s = talloc_strdup(ctdb, de->d_name);
1316                 if (s == NULL) {
1317                         closedir(d);
1318                         CTDB_NO_MEMORY(ctdb, s);
1319                 }
1320
1321                 /* only accept names ending in .tdb */
1322                 p = strstr(s, ".tdb.");
1323                 if (len < 7 || p == NULL) {
1324                         talloc_free(s);
1325                         continue;
1326                 }
1327
1328                 /* only accept names ending with .tdb. and any number of digits */
1329                 q = p+5;
1330                 while (*q != 0 && invalid_name == 0) {
1331                         if (!isdigit(*q++)) {
1332                                 invalid_name = 1;
1333                         }
1334                 }
1335                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1336                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1337                         talloc_free(s);
1338                         continue;
1339                 }
1340                 p[4] = 0;
1341
1342                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1343                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1344                         closedir(d);
1345                         talloc_free(s);
1346                         return -1;
1347                 }
1348
1349                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1350
1351                 talloc_free(s);
1352         }
1353         closedir(d);
1354         return 0;
1355 }
1356
1357 int ctdb_attach_databases(struct ctdb_context *ctdb)
1358 {
1359         int ret;
1360         char *persistent_health_path = NULL;
1361         char *unhealthy_reason = NULL;
1362         bool first_try = true;
1363
1364         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1365                                                  ctdb->db_directory_state,
1366                                                  PERSISTENT_HEALTH_TDB,
1367                                                  ctdb->pnn);
1368         if (persistent_health_path == NULL) {
1369                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1370                 return -1;
1371         }
1372
1373 again:
1374
1375         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1376                                                    0, TDB_DISALLOW_NESTING,
1377                                                    O_CREAT | O_RDWR, 0600);
1378         if (ctdb->db_persistent_health == NULL) {
1379                 struct tdb_wrap *tdb;
1380
1381                 if (!first_try) {
1382                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1383                                           persistent_health_path,
1384                                           errno,
1385                                           strerror(errno)));
1386                         talloc_free(persistent_health_path);
1387                         talloc_free(unhealthy_reason);
1388                         return -1;
1389                 }
1390                 first_try = false;
1391
1392                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1393                                                    persistent_health_path,
1394                                                    "was cleared after a failure",
1395                                                    "manual verification needed");
1396                 if (unhealthy_reason == NULL) {
1397                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1398                         talloc_free(persistent_health_path);
1399                         return -1;
1400                 }
1401
1402                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1403                                   persistent_health_path));
1404                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1405                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1406                                     O_CREAT | O_RDWR, 0600);
1407                 if (tdb) {
1408                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1409                                           persistent_health_path,
1410                                           errno,
1411                                           strerror(errno)));
1412                         talloc_free(persistent_health_path);
1413                         talloc_free(unhealthy_reason);
1414                         return -1;
1415                 }
1416
1417                 talloc_free(tdb);
1418                 goto again;
1419         }
1420         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1421         if (ret != 0) {
1422                 struct tdb_wrap *tdb;
1423
1424                 talloc_free(ctdb->db_persistent_health);
1425                 ctdb->db_persistent_health = NULL;
1426
1427                 if (!first_try) {
1428                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1429                                           persistent_health_path));
1430                         talloc_free(persistent_health_path);
1431                         talloc_free(unhealthy_reason);
1432                         return -1;
1433                 }
1434                 first_try = false;
1435
1436                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1437                                                    persistent_health_path,
1438                                                    "was cleared after a failure",
1439                                                    "manual verification needed");
1440                 if (unhealthy_reason == NULL) {
1441                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1442                         talloc_free(persistent_health_path);
1443                         return -1;
1444                 }
1445
1446                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1447                                   persistent_health_path));
1448                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1449                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1450                                     O_CREAT | O_RDWR, 0600);
1451                 if (tdb) {
1452                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1453                                           persistent_health_path,
1454                                           errno,
1455                                           strerror(errno)));
1456                         talloc_free(persistent_health_path);
1457                         talloc_free(unhealthy_reason);
1458                         return -1;
1459                 }
1460
1461                 talloc_free(tdb);
1462                 goto again;
1463         }
1464         talloc_free(persistent_health_path);
1465
1466         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1467         talloc_free(unhealthy_reason);
1468         if (ret != 0) {
1469                 return ret;
1470         }
1471
1472         return 0;
1473 }
1474
1475 /*
1476   called when a broadcast seqnum update comes in
1477  */
1478 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1479 {
1480         struct ctdb_db_context *ctdb_db;
1481         if (srcnode == ctdb->pnn) {
1482                 /* don't update ourselves! */
1483                 return 0;
1484         }
1485
1486         ctdb_db = find_ctdb_db(ctdb, db_id);
1487         if (!ctdb_db) {
1488                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1489                 return -1;
1490         }
1491
1492         if (ctdb_db->unhealthy_reason) {
1493                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1494                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1495                 return -1;
1496         }
1497
1498         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1499         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1500         return 0;
1501 }
1502
1503 /*
1504   timer to check for seqnum changes in a ltdb and propogate them
1505  */
1506 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1507                                    struct timeval t, void *p)
1508 {
1509         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1510         struct ctdb_context *ctdb = ctdb_db->ctdb;
1511         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1512         if (new_seqnum != ctdb_db->seqnum) {
1513                 /* something has changed - propogate it */
1514                 TDB_DATA data;
1515                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1516                 data.dsize = sizeof(uint32_t);
1517                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1518                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1519                                          data, NULL, NULL);             
1520         }
1521         ctdb_db->seqnum = new_seqnum;
1522
1523         /* setup a new timer */
1524         ctdb_db->seqnum_update =
1525                 event_add_timed(ctdb->ev, ctdb_db, 
1526                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1527                                 ctdb_ltdb_seqnum_check, ctdb_db);
1528 }
1529
1530 /*
1531   enable seqnum handling on this db
1532  */
1533 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1534 {
1535         struct ctdb_db_context *ctdb_db;
1536         ctdb_db = find_ctdb_db(ctdb, db_id);
1537         if (!ctdb_db) {
1538                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1539                 return -1;
1540         }
1541
1542         if (ctdb_db->seqnum_update == NULL) {
1543                 ctdb_db->seqnum_update =
1544                         event_add_timed(ctdb->ev, ctdb_db, 
1545                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1546                                         ctdb_ltdb_seqnum_check, ctdb_db);
1547         }
1548
1549         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1550         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1551         return 0;
1552 }
1553
1554 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
1555                                      uint32_t client_id)
1556 {
1557         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1558         struct ctdb_db_context *ctdb_db;
1559
1560         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1561         if (!ctdb_db) {
1562                 if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
1563                         DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
1564                                          db_prio->db_id));
1565                 }
1566                 return 0;
1567         }
1568
1569         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1570                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1571                 return 0;
1572         }
1573
1574         ctdb_db->priority = db_prio->priority;
1575         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1576
1577         if (client_id != 0) {
1578                 /* Broadcast the update to the rest of the cluster */
1579                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1580                                          CTDB_CONTROL_SET_DB_PRIORITY, 0,
1581                                          CTDB_CTRL_FLAG_NOREPLY, indata,
1582                                          NULL, NULL);
1583         }
1584         return 0;
1585 }
1586
1587
1588 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1589 {
1590         if (ctdb_db->sticky) {
1591                 return 0;
1592         }
1593
1594         if (ctdb_db->persistent) {
1595                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1596                 return -1;
1597         }
1598
1599         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1600
1601         ctdb_db->sticky = true;
1602
1603         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1604
1605         return 0;
1606 }
1607
1608 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1609                                 uint32_t db_id,
1610                                 TDB_DATA *outdata)
1611 {
1612         struct ctdb_db_context *ctdb_db;
1613         struct ctdb_db_statistics *stats;
1614         int i;
1615         int len;
1616         char *ptr;
1617
1618         ctdb_db = find_ctdb_db(ctdb, db_id);
1619         if (!ctdb_db) {
1620                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1621                 return -1;
1622         }
1623
1624         len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
1625         for (i = 0; i < MAX_HOT_KEYS; i++) {
1626                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1627         }
1628
1629         stats = talloc_size(outdata, len);
1630         if (stats == NULL) {
1631                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1632                 return -1;
1633         }
1634
1635         *stats = ctdb_db->statistics;
1636
1637         stats->num_hot_keys = MAX_HOT_KEYS;
1638
1639         ptr = &stats->hot_keys_wire[0];
1640         for (i = 0; i < MAX_HOT_KEYS; i++) {
1641                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1642                        ctdb_db->statistics.hot_keys[i].key.dsize);
1643                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1644         }
1645
1646         outdata->dptr  = (uint8_t *)stats;
1647         outdata->dsize = len;
1648
1649         return 0;
1650 }