ctdb-daemon: Delete empty records from persistent database
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
45
46 /**
47  * write a record to a normal database
48  *
49  * This is the server-variant of the ctdb_ltdb_store function.
50  * It contains logic to determine whether a record should be
51  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52  * controls to the local ctdb daemon if apporpriate.
53  */
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55                                   TDB_DATA key,
56                                   struct ctdb_ltdb_header *header,
57                                   TDB_DATA data)
58 {
59         struct ctdb_context *ctdb = ctdb_db->ctdb;
60         TDB_DATA rec[2];
61         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
62         int ret;
63         bool seqnum_suppressed = false;
64         bool keep = false;
65         bool schedule_for_deletion = false;
66         bool remove_from_delete_queue = false;
67         uint32_t lmaster;
68
69         if (ctdb->flags & CTDB_FLAG_TORTURE) {
70                 TDB_DATA old;
71                 struct ctdb_ltdb_header *h2;
72
73                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74                 h2 = (struct ctdb_ltdb_header *)old.dptr;
75                 if (old.dptr != NULL &&
76                     old.dsize >= hsize &&
77                     h2->rsn > header->rsn) {
78                         DEBUG(DEBUG_ERR,
79                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
80                                h2->rsn, header->rsn));
81                 }
82                 if (old.dptr) {
83                         free(old.dptr);
84                 }
85         }
86
87         if (ctdb->vnn_map == NULL) {
88                 /*
89                  * Called from a client: always store the record
90                  * Also don't call ctdb_lmaster since it uses the vnn_map!
91                  */
92                 keep = true;
93                 goto store;
94         }
95
96         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
97
98         /*
99          * If we migrate an empty record off to another node
100          * and the record has not been migrated with data,
101          * delete the record instead of storing the empty record.
102          */
103         if (data.dsize != 0) {
104                 keep = true;
105         } else if (header->flags & CTDB_REC_RO_FLAGS) {
106                 keep = true;
107         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
108                 /*
109                  * The record is not created by the client but
110                  * automatically by the ctdb_ltdb_fetch logic that
111                  * creates a record with an initial header in the
112                  * ltdb before trying to migrate the record from
113                  * the current lmaster. Keep it instead of trying
114                  * to delete the non-existing record...
115                  */
116                 keep = true;
117                 schedule_for_deletion = true;
118         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
119                 keep = true;
120         } else if (ctdb_db->ctdb->pnn == lmaster) {
121                 /*
122                  * If we are lmaster, then we usually keep the record.
123                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
124                  * and the record is empty and has never been migrated
125                  * with data, then we should delete it instead of storing it.
126                  * This is part of the vacuuming process.
127                  *
128                  * The reason that we usually need to store even empty records
129                  * on the lmaster is that a client operating directly on the
130                  * lmaster (== dmaster) expects the local copy of the record to
131                  * exist after successful ctdb migrate call. If the record does
132                  * not exist, the client goes into a migrate loop and eventually
133                  * fails. So storing the empty record makes sure that we do not
134                  * need to change the client code.
135                  */
136                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
137                         keep = true;
138                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
139                         keep = true;
140                 }
141         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
142                 keep = true;
143         }
144
145         if (keep) {
146                 if (!ctdb_db->persistent &&
147                     (ctdb_db->ctdb->pnn == header->dmaster) &&
148                     !(header->flags & CTDB_REC_RO_FLAGS))
149                 {
150                         header->rsn++;
151
152                         if (data.dsize == 0) {
153                                 schedule_for_deletion = true;
154                         }
155                 }
156                 remove_from_delete_queue = !schedule_for_deletion;
157         }
158
159 store:
160         /*
161          * The VACUUM_MIGRATED flag is only set temporarily for
162          * the above logic when the record was retrieved by a
163          * VACUUM_MIGRATE call and should not be stored in the
164          * database.
165          *
166          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
167          * and there are two cases in which the corresponding record
168          * is stored in the local database:
169          * 1. The record has been migrated with data in the past
170          *    (the MIGRATED_WITH_DATA record flag is set).
171          * 2. The record has been filled with data again since it
172          *    had been submitted in the VACUUM_FETCH message to the
173          *    lmaster.
174          * For such records it is important to not store the
175          * VACUUM_MIGRATED flag in the database.
176          */
177         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
178
179         /*
180          * Similarly, clear the AUTOMATIC flag which should not enter
181          * the local database copy since this would require client
182          * modifications to clear the flag when the client stores
183          * the record.
184          */
185         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
186
187         rec[0].dsize = hsize;
188         rec[0].dptr = (uint8_t *)header;
189
190         rec[1].dsize = data.dsize;
191         rec[1].dptr = data.dptr;
192
193         /* Databases with seqnum updates enabled only get their seqnum
194            changes when/if we modify the data */
195         if (ctdb_db->seqnum_update != NULL) {
196                 TDB_DATA old;
197                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
198
199                 if ((old.dsize == hsize + data.dsize) &&
200                     memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
201                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
202                         seqnum_suppressed = true;
203                 }
204                 if (old.dptr != NULL) {
205                         free(old.dptr);
206                 }
207         }
208
209         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
210                             ctdb_db->db_name,
211                             keep?"storing":"deleting",
212                             ctdb_hash(&key)));
213
214         if (keep) {
215                 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
216         } else {
217                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
218         }
219
220         if (ret != 0) {
221                 int lvl = DEBUG_ERR;
222
223                 if (keep == false &&
224                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
225                 {
226                         lvl = DEBUG_DEBUG;
227                 }
228
229                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
230                             "%d - %s\n",
231                             ctdb_db->db_name,
232                             keep?"store":"delete", ret,
233                             tdb_errorstr(ctdb_db->ltdb->tdb)));
234
235                 schedule_for_deletion = false;
236                 remove_from_delete_queue = false;
237         }
238         if (seqnum_suppressed) {
239                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
240         }
241
242         if (schedule_for_deletion) {
243                 int ret2;
244                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
245                 if (ret2 != 0) {
246                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
247                 }
248         }
249
250         if (remove_from_delete_queue) {
251                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
252         }
253
254         return ret;
255 }
256
257 struct lock_fetch_state {
258         struct ctdb_context *ctdb;
259         struct ctdb_db_context *ctdb_db;
260         void (*recv_pkt)(void *, struct ctdb_req_header *);
261         void *recv_context;
262         struct ctdb_req_header *hdr;
263         uint32_t generation;
264         bool ignore_generation;
265 };
266
267 /*
268   called when we should retry the operation
269  */
270 static void lock_fetch_callback(void *p, bool locked)
271 {
272         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
273         if (!state->ignore_generation &&
274             state->generation != state->ctdb_db->generation) {
275                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
276                 talloc_free(state->hdr);
277                 return;
278         }
279         state->recv_pkt(state->recv_context, state->hdr);
280         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
281 }
282
283
284 /*
285   do a non-blocking ltdb_lock, deferring this ctdb request until we
286   have the chainlock
287
288   It does the following:
289
290    1) tries to get the chainlock. If it succeeds, then it returns 0
291
292    2) if it fails to get a chainlock immediately then it sets up a
293    non-blocking chainlock via ctdb_lock_record, and when it gets the
294    chainlock it re-submits this ctdb request to the main packet
295    receive function.
296
297    This effectively queues all ctdb requests that cannot be
298    immediately satisfied until it can get the lock. This means that
299    the main ctdb daemon will not block waiting for a chainlock held by
300    a client
301
302    There are 3 possible return values:
303
304        0:    means that it got the lock immediately.
305       -1:    means that it failed to get the lock, and won't retry
306       -2:    means that it failed to get the lock immediately, but will retry
307  */
308 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
309                            TDB_DATA key, struct ctdb_req_header *hdr,
310                            void (*recv_pkt)(void *, struct ctdb_req_header *),
311                            void *recv_context, bool ignore_generation)
312 {
313         int ret;
314         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
315         struct lock_request *lreq;
316         struct lock_fetch_state *state;
317         
318         ret = tdb_chainlock_nonblock(tdb, key);
319
320         if (ret != 0 &&
321             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
322                 /* a hard failure - don't try again */
323                 return -1;
324         }
325
326         /* when torturing, ensure we test the contended path */
327         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
328             random() % 5 == 0) {
329                 ret = -1;
330                 tdb_chainunlock(tdb, key);
331         }
332
333         /* first the non-contended path */
334         if (ret == 0) {
335                 return 0;
336         }
337
338         state = talloc(hdr, struct lock_fetch_state);
339         state->ctdb = ctdb_db->ctdb;
340         state->ctdb_db = ctdb_db;
341         state->hdr = hdr;
342         state->recv_pkt = recv_pkt;
343         state->recv_context = recv_context;
344         state->generation = ctdb_db->generation;
345         state->ignore_generation = ignore_generation;
346
347         /* now the contended path */
348         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
349         if (lreq == NULL) {
350                 return -1;
351         }
352
353         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
354            so it won't be freed yet */
355         talloc_steal(state, hdr);
356
357         /* now tell the caller than we will retry asynchronously */
358         return -2;
359 }
360
361 /*
362   a varient of ctdb_ltdb_lock_requeue that also fetches the record
363  */
364 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
365                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
366                                  struct ctdb_req_header *hdr, TDB_DATA *data,
367                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
368                                  void *recv_context, bool ignore_generation)
369 {
370         int ret;
371
372         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
373                                      recv_context, ignore_generation);
374         if (ret == 0) {
375                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
376                 if (ret != 0) {
377                         int uret;
378                         uret = ctdb_ltdb_unlock(ctdb_db, key);
379                         if (uret != 0) {
380                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
381                         }
382                 }
383         }
384         return ret;
385 }
386
387
388 /*
389   paraoid check to see if the db is empty
390  */
391 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
392 {
393         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
394         int count = tdb_traverse_read(tdb, NULL, NULL);
395         if (count != 0) {
396                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
397                          ctdb_db->db_path));
398                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
399         }
400 }
401
402 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
403                                 struct ctdb_db_context *ctdb_db)
404 {
405         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
406         char *old;
407         char *reason = NULL;
408         TDB_DATA key;
409         TDB_DATA val;
410
411         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
412         key.dsize = strlen(ctdb_db->db_name);
413
414         old = ctdb_db->unhealthy_reason;
415         ctdb_db->unhealthy_reason = NULL;
416
417         val = tdb_fetch(tdb, key);
418         if (val.dsize > 0) {
419                 reason = talloc_strndup(ctdb_db,
420                                         (const char *)val.dptr,
421                                         val.dsize);
422                 if (reason == NULL) {
423                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
424                                            (int)val.dsize));
425                         ctdb_db->unhealthy_reason = old;
426                         free(val.dptr);
427                         return -1;
428                 }
429         }
430
431         if (val.dptr) {
432                 free(val.dptr);
433         }
434
435         talloc_free(old);
436         ctdb_db->unhealthy_reason = reason;
437         return 0;
438 }
439
440 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
441                                   struct ctdb_db_context *ctdb_db,
442                                   const char *given_reason,/* NULL means healthy */
443                                   int num_healthy_nodes)
444 {
445         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
446         int ret;
447         TDB_DATA key;
448         TDB_DATA val;
449         char *new_reason = NULL;
450         char *old_reason = NULL;
451
452         ret = tdb_transaction_start(tdb);
453         if (ret != 0) {
454                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
455                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
456                 return -1;
457         }
458
459         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
460         if (ret != 0) {
461                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
462                                    ctdb_db->db_name, ret));
463                 return -1;
464         }
465         old_reason = ctdb_db->unhealthy_reason;
466
467         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
468         key.dsize = strlen(ctdb_db->db_name);
469
470         if (given_reason) {
471                 new_reason = talloc_strdup(ctdb_db, given_reason);
472                 if (new_reason == NULL) {
473                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
474                                           given_reason));
475                         return -1;
476                 }
477         } else if (old_reason && num_healthy_nodes == 0) {
478                 /*
479                  * If the reason indicates ok, but there where no healthy nodes
480                  * available, that it means, we have not recovered valid content
481                  * of the db. So if there's an old reason, prefix it with
482                  * "NO-HEALTHY-NODES - "
483                  */
484                 const char *prefix;
485
486 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
487                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
488                 if (ret != 0) {
489                         prefix = _TMP_PREFIX;
490                 } else {
491                         prefix = "";
492                 }
493                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
494                                          prefix, old_reason);
495                 if (new_reason == NULL) {
496                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
497                                           prefix, old_reason));
498                         return -1;
499                 }
500 #undef _TMP_PREFIX
501         }
502
503         if (new_reason) {
504                 val.dptr = discard_const_p(uint8_t, new_reason);
505                 val.dsize = strlen(new_reason);
506
507                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
508                 if (ret != 0) {
509                         tdb_transaction_cancel(tdb);
510                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
511                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
512                                            ret, tdb_errorstr(tdb)));
513                         talloc_free(new_reason);
514                         return -1;
515                 }
516                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
517                                    ctdb_db->db_name, new_reason));
518         } else if (old_reason) {
519                 ret = tdb_delete(tdb, key);
520                 if (ret != 0) {
521                         tdb_transaction_cancel(tdb);
522                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
523                                            tdb_name(tdb), ctdb_db->db_name,
524                                            ret, tdb_errorstr(tdb)));
525                         talloc_free(new_reason);
526                         return -1;
527                 }
528                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
529                                    ctdb_db->db_name));
530         }
531
532         ret = tdb_transaction_commit(tdb);
533         if (ret != TDB_SUCCESS) {
534                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
535                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
536                 talloc_free(new_reason);
537                 return -1;
538         }
539
540         talloc_free(old_reason);
541         ctdb_db->unhealthy_reason = new_reason;
542
543         return 0;
544 }
545
546 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
547                                      struct ctdb_db_context *ctdb_db)
548 {
549         time_t now = time(NULL);
550         char *new_path;
551         char *new_reason;
552         int ret;
553         struct tm *tm;
554
555         tm = gmtime(&now);
556
557         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
558         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
559                                    "%04u%02u%02u%02u%02u%02u.0Z",
560                                    ctdb_db->db_path,
561                                    tm->tm_year+1900, tm->tm_mon+1,
562                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
563                                    tm->tm_sec);
564         if (new_path == NULL) {
565                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
566                 return -1;
567         }
568
569         new_reason = talloc_asprintf(ctdb_db,
570                                      "ERROR - Backup of corrupted TDB in '%s'",
571                                      new_path);
572         if (new_reason == NULL) {
573                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
574                 return -1;
575         }
576         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
577         talloc_free(new_reason);
578         if (ret != 0) {
579                 DEBUG(DEBUG_CRIT,(__location__
580                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
581                                  ctdb_db->db_path));
582                 return -1;
583         }
584
585         ret = rename(ctdb_db->db_path, new_path);
586         if (ret != 0) {
587                 DEBUG(DEBUG_CRIT,(__location__
588                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
589                                   ctdb_db->db_path, new_path,
590                                   errno, strerror(errno)));
591                 talloc_free(new_path);
592                 return -1;
593         }
594
595         DEBUG(DEBUG_CRIT,(__location__
596                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
597                          ctdb_db->db_path, new_path));
598         talloc_free(new_path);
599         return 0;
600 }
601
602 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
603 {
604         struct ctdb_db_context *ctdb_db;
605         int ret;
606         int ok = 0;
607         int fail = 0;
608
609         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
610                 if (!ctdb_db->persistent) {
611                         continue;
612                 }
613
614                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
615                 if (ret != 0) {
616                         DEBUG(DEBUG_ALERT,(__location__
617                                            " load persistent health for '%s' failed\n",
618                                            ctdb_db->db_path));
619                         return -1;
620                 }
621
622                 if (ctdb_db->unhealthy_reason == NULL) {
623                         ok++;
624                         DEBUG(DEBUG_INFO,(__location__
625                                    " persistent db '%s' healthy\n",
626                                    ctdb_db->db_path));
627                         continue;
628                 }
629
630                 fail++;
631                 DEBUG(DEBUG_ALERT,(__location__
632                                    " persistent db '%s' unhealthy: %s\n",
633                                    ctdb_db->db_path,
634                                    ctdb_db->unhealthy_reason));
635         }
636         DEBUG(DEBUG_NOTICE,
637               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
638                ok, fail));
639
640         if (fail != 0) {
641                 return -1;
642         }
643
644         return 0;
645 }
646
647
648 /*
649   mark a database - as healthy
650  */
651 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
652 {
653         uint32_t db_id = *(uint32_t *)indata.dptr;
654         struct ctdb_db_context *ctdb_db;
655         int ret;
656         bool may_recover = false;
657
658         ctdb_db = find_ctdb_db(ctdb, db_id);
659         if (!ctdb_db) {
660                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
661                 return -1;
662         }
663
664         if (ctdb_db->unhealthy_reason) {
665                 may_recover = true;
666         }
667
668         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
669         if (ret != 0) {
670                 DEBUG(DEBUG_ERR,(__location__
671                                  " ctdb_update_persistent_health(%s) failed\n",
672                                  ctdb_db->db_name));
673                 return -1;
674         }
675
676         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
677                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
678                                   ctdb_db->db_name));
679                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
680         }
681
682         return 0;
683 }
684
685 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
686                                    TDB_DATA indata,
687                                    TDB_DATA *outdata)
688 {
689         uint32_t db_id = *(uint32_t *)indata.dptr;
690         struct ctdb_db_context *ctdb_db;
691         int ret;
692
693         ctdb_db = find_ctdb_db(ctdb, db_id);
694         if (!ctdb_db) {
695                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
696                 return -1;
697         }
698
699         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
700         if (ret != 0) {
701                 DEBUG(DEBUG_ERR,(__location__
702                                  " ctdb_load_persistent_health(%s) failed\n",
703                                  ctdb_db->db_name));
704                 return -1;
705         }
706
707         *outdata = tdb_null;
708         if (ctdb_db->unhealthy_reason) {
709                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
710                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
711         }
712
713         return 0;
714 }
715
716
717 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
718 {
719         char *ropath;
720
721         if (ctdb_db->readonly) {
722                 return 0;
723         }
724
725         if (ctdb_db->persistent) {
726                 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
727                 return -1;
728         }
729
730         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
731         if (ropath == NULL) {
732                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
733                 return -1;
734         }
735         ctdb_db->rottdb = tdb_open(ropath, 
736                               ctdb->tunable.database_hash_size, 
737                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
738                               O_CREAT|O_RDWR, 0600);
739         if (ctdb_db->rottdb == NULL) {
740                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
741                 talloc_free(ropath);
742                 return -1;
743         }
744
745         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
746
747         ctdb_db->readonly = true;
748
749         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
750
751         talloc_free(ropath);
752         return 0;
753 }
754
755 /*
756   attach to a database, handling both persistent and non-persistent databases
757   return 0 on success, -1 on failure
758  */
759 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
760                              bool persistent, const char *unhealthy_reason,
761                              bool jenkinshash, bool mutexes)
762 {
763         struct ctdb_db_context *ctdb_db, *tmp_db;
764         int ret;
765         struct TDB_DATA key;
766         unsigned tdb_flags;
767         int mode = 0600;
768         int remaining_tries = 0;
769
770         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
771         CTDB_NO_MEMORY(ctdb, ctdb_db);
772
773         ctdb_db->ctdb = ctdb;
774         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
775         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
776
777         key.dsize = strlen(db_name)+1;
778         key.dptr  = discard_const(db_name);
779         ctdb_db->db_id = ctdb_hash(&key);
780         ctdb_db->persistent = persistent;
781
782         if (!ctdb_db->persistent) {
783                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
784                 if (ctdb_db->delete_queue == NULL) {
785                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
786                 }
787
788                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
789         }
790
791         /* check for hash collisions */
792         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
793                 if (tmp_db->db_id == ctdb_db->db_id) {
794                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
795                                  tmp_db->db_id, db_name, tmp_db->db_name));
796                         talloc_free(ctdb_db);
797                         return -1;
798                 }
799         }
800
801         if (persistent) {
802                 if (unhealthy_reason) {
803                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
804                                                             unhealthy_reason, 0);
805                         if (ret != 0) {
806                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
807                                                    ctdb_db->db_name, unhealthy_reason, ret));
808                                 talloc_free(ctdb_db);
809                                 return -1;
810                         }
811                 }
812
813                 if (ctdb->max_persistent_check_errors > 0) {
814                         remaining_tries = 1;
815                 }
816                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
817                         remaining_tries = 0;
818                 }
819
820                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
821                 if (ret != 0) {
822                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
823                                    ctdb_db->db_name, ret));
824                         talloc_free(ctdb_db);
825                         return -1;
826                 }
827         }
828
829         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
830                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
831                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
832                 talloc_free(ctdb_db);
833                 return -1;
834         }
835
836         if (ctdb_db->unhealthy_reason) {
837                 /* this is just a warning, but we want that in the log file! */
838                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
839                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
840         }
841
842         /* open the database */
843         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
844                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
845                                            db_name, ctdb->pnn);
846
847         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
848         if (ctdb->valgrinding) {
849                 tdb_flags |= TDB_NOMMAP;
850         }
851         tdb_flags |= TDB_DISALLOW_NESTING;
852         if (jenkinshash) {
853                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
854         }
855 #ifdef TDB_MUTEX_LOCKING
856         if (ctdb->tunable.mutex_enabled && mutexes &&
857             tdb_runtime_check_for_robust_mutexes()) {
858                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
859         }
860 #endif
861
862 again:
863         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
864                                       ctdb->tunable.database_hash_size, 
865                                       tdb_flags, 
866                                       O_CREAT|O_RDWR, mode);
867         if (ctdb_db->ltdb == NULL) {
868                 struct stat st;
869                 int saved_errno = errno;
870
871                 if (!persistent) {
872                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
873                                           ctdb_db->db_path,
874                                           saved_errno,
875                                           strerror(saved_errno)));
876                         talloc_free(ctdb_db);
877                         return -1;
878                 }
879
880                 if (remaining_tries == 0) {
881                         DEBUG(DEBUG_CRIT,(__location__
882                                           "Failed to open persistent tdb '%s': %d - %s\n",
883                                           ctdb_db->db_path,
884                                           saved_errno,
885                                           strerror(saved_errno)));
886                         talloc_free(ctdb_db);
887                         return -1;
888                 }
889
890                 ret = stat(ctdb_db->db_path, &st);
891                 if (ret != 0) {
892                         DEBUG(DEBUG_CRIT,(__location__
893                                           "Failed to open persistent tdb '%s': %d - %s\n",
894                                           ctdb_db->db_path,
895                                           saved_errno,
896                                           strerror(saved_errno)));
897                         talloc_free(ctdb_db);
898                         return -1;
899                 }
900
901                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
902                 if (ret != 0) {
903                         DEBUG(DEBUG_CRIT,(__location__
904                                           "Failed to open persistent tdb '%s': %d - %s\n",
905                                           ctdb_db->db_path,
906                                           saved_errno,
907                                           strerror(saved_errno)));
908                         talloc_free(ctdb_db);
909                         return -1;
910                 }
911
912                 remaining_tries--;
913                 mode = st.st_mode;
914                 goto again;
915         }
916
917         if (!persistent) {
918                 ctdb_check_db_empty(ctdb_db);
919         } else {
920                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
921                 if (ret != 0) {
922                         int fd;
923                         struct stat st;
924
925                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
926                                           ctdb_db->db_path, ret,
927                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
928                         if (remaining_tries == 0) {
929                                 talloc_free(ctdb_db);
930                                 return -1;
931                         }
932
933                         fd = tdb_fd(ctdb_db->ltdb->tdb);
934                         ret = fstat(fd, &st);
935                         if (ret != 0) {
936                                 DEBUG(DEBUG_CRIT,(__location__
937                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
938                                                   ctdb_db->db_path,
939                                                   errno,
940                                                   strerror(errno)));
941                                 talloc_free(ctdb_db);
942                                 return -1;
943                         }
944
945                         /* close the TDB */
946                         talloc_free(ctdb_db->ltdb);
947                         ctdb_db->ltdb = NULL;
948
949                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
950                         if (ret != 0) {
951                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
952                                                   ctdb_db->db_path));
953                                 talloc_free(ctdb_db);
954                                 return -1;
955                         }
956
957                         remaining_tries--;
958                         mode = st.st_mode;
959                         goto again;
960                 }
961         }
962
963         /* set up a rb tree we can use to track which records we have a 
964            fetch-lock in-flight for so we can defer any additional calls
965            for the same record.
966          */
967         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
968         if (ctdb_db->deferred_fetch == NULL) {
969                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
970                 talloc_free(ctdb_db);
971                 return -1;
972         }
973
974         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
975         if (ctdb_db->defer_dmaster == NULL) {
976                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
977                                   ctdb_db->db_name));
978                 talloc_free(ctdb_db);
979                 return -1;
980         }
981
982         DLIST_ADD(ctdb->db_list, ctdb_db);
983
984         /* setting this can help some high churn databases */
985         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
986
987         /* 
988            all databases support the "null" function. we need this in
989            order to do forced migration of records
990         */
991         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
992         if (ret != 0) {
993                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
994                 talloc_free(ctdb_db);
995                 return -1;
996         }
997
998         /* 
999            all databases support the "fetch" function. we need this
1000            for efficient Samba3 ctdb fetch
1001         */
1002         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1003         if (ret != 0) {
1004                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1005                 talloc_free(ctdb_db);
1006                 return -1;
1007         }
1008
1009         /* 
1010            all databases support the "fetch_with_header" function. we need this
1011            for efficient readonly record fetches
1012         */
1013         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1014         if (ret != 0) {
1015                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1016                 talloc_free(ctdb_db);
1017                 return -1;
1018         }
1019
1020         ret = ctdb_vacuum_init(ctdb_db);
1021         if (ret != 0) {
1022                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1023                                   "database '%s'\n", ctdb_db->db_name));
1024                 talloc_free(ctdb_db);
1025                 return -1;
1026         }
1027
1028         ret = ctdb_migration_init(ctdb_db);
1029         if (ret != 0) {
1030                 DEBUG(DEBUG_ERR,
1031                       ("Failed to setup migration tracking for db '%s'\n",
1032                        ctdb_db->db_name));
1033                 talloc_free(ctdb_db);
1034                 return -1;
1035         }
1036
1037         ctdb_db->generation = ctdb->vnn_map->generation;
1038
1039         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1040                             ctdb_db->db_path, tdb_flags));
1041
1042         /* success */
1043         return 0;
1044 }
1045
1046
1047 struct ctdb_deferred_attach_context {
1048         struct ctdb_deferred_attach_context *next, *prev;
1049         struct ctdb_context *ctdb;
1050         struct ctdb_req_control_old *c;
1051 };
1052
1053
1054 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1055 {
1056         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1057
1058         return 0;
1059 }
1060
1061 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1062                                          struct tevent_timer *te,
1063                                          struct timeval t, void *private_data)
1064 {
1065         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1066         struct ctdb_context *ctdb = da_ctx->ctdb;
1067
1068         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1069         talloc_free(da_ctx);
1070 }
1071
1072 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1073                                           struct tevent_timer *te,
1074                                           struct timeval t, void *private_data)
1075 {
1076         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1077         struct ctdb_context *ctdb = da_ctx->ctdb;
1078
1079         /* This talloc-steals the packet ->c */
1080         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1081         talloc_free(da_ctx);
1082 }
1083
1084 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1085 {
1086         struct ctdb_deferred_attach_context *da_ctx;
1087
1088         /* call it from the main event loop as soon as the current event 
1089            finishes.
1090          */
1091         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1092                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1093                 tevent_add_timer(ctdb->ev, da_ctx,
1094                                  timeval_current_ofs(1,0),
1095                                  ctdb_deferred_attach_callback, da_ctx);
1096         }
1097
1098         return 0;
1099 }
1100
1101 /*
1102   a client has asked to attach a new database
1103  */
1104 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1105                                TDB_DATA *outdata, uint64_t tdb_flags, 
1106                                bool persistent, uint32_t client_id,
1107                                struct ctdb_req_control_old *c,
1108                                bool *async_reply)
1109 {
1110         const char *db_name = (const char *)indata.dptr;
1111         struct ctdb_db_context *db;
1112         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1113         struct ctdb_client *client = NULL;
1114         bool with_jenkinshash, with_mutexes;
1115
1116         if (ctdb->tunable.allow_client_db_attach == 0) {
1117                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1118                                   "AllowClientDBAccess == 0\n", db_name));
1119                 return -1;
1120         }
1121
1122         /* don't allow any local clients to attach while we are in recovery mode
1123          * except for the recovery daemon.
1124          * allow all attach from the network since these are always from remote
1125          * recovery daemons.
1126          */
1127         if (client_id != 0) {
1128                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1129         }
1130         if (client != NULL) {
1131                 /* If the node is inactive it is not part of the cluster
1132                    and we should not allow clients to attach to any
1133                    databases
1134                 */
1135                 if (node->flags & NODE_FLAGS_INACTIVE) {
1136                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1137                         return -1;
1138                 }
1139
1140                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1141                     client->pid != ctdb->recoverd_pid &&
1142                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1143                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1144
1145                         if (da_ctx == NULL) {
1146                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1147                                 return -1;
1148                         }
1149
1150                         da_ctx->ctdb = ctdb;
1151                         da_ctx->c = talloc_steal(da_ctx, c);
1152                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1153                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1154
1155                         tevent_add_timer(ctdb->ev, da_ctx,
1156                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1157                                          ctdb_deferred_attach_timeout, da_ctx);
1158
1159                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1160                         *async_reply = true;
1161                         return 0;
1162                 }
1163         }
1164
1165         /* the client can optionally pass additional tdb flags, but we
1166            only allow a subset of those on the database in ctdb. Note
1167            that tdb_flags is passed in via the (otherwise unused)
1168            srvid to the attach control */
1169 #ifdef TDB_MUTEX_LOCKING
1170         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1171 #else
1172         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1173 #endif
1174
1175         /* see if we already have this name */
1176         db = ctdb_db_handle(ctdb, db_name);
1177         if (db) {
1178                 if (db->persistent != persistent) {
1179                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1180                                           "database %s\n", persistent ? "" : "non-",
1181                                           db-> persistent ? "" : "non-", db_name));
1182                         return -1;
1183                 }
1184                 outdata->dptr  = (uint8_t *)&db->db_id;
1185                 outdata->dsize = sizeof(db->db_id);
1186                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1187                 return 0;
1188         }
1189
1190         with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1191 #ifdef TDB_MUTEX_LOCKING
1192         with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1193 #else
1194         with_mutexes = false;
1195 #endif
1196
1197         if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1198                               with_jenkinshash, with_mutexes) != 0) {
1199                 return -1;
1200         }
1201
1202         db = ctdb_db_handle(ctdb, db_name);
1203         if (!db) {
1204                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1205                 return -1;
1206         }
1207
1208         /* remember the flags the client has specified */
1209         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1210
1211         outdata->dptr  = (uint8_t *)&db->db_id;
1212         outdata->dsize = sizeof(db->db_id);
1213
1214         /* Try to ensure it's locked in mem */
1215         lockdown_memory(ctdb->valgrinding);
1216
1217         /* tell all the other nodes about this database */
1218         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1219                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1220                                                 CTDB_CONTROL_DB_ATTACH,
1221                                  0, CTDB_CTRL_FLAG_NOREPLY,
1222                                  indata, NULL, NULL);
1223
1224         /* success */
1225         return 0;
1226 }
1227
1228 /*
1229  * a client has asked to detach from a database
1230  */
1231 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1232                                uint32_t client_id)
1233 {
1234         uint32_t db_id;
1235         struct ctdb_db_context *ctdb_db;
1236         struct ctdb_client *client = NULL;
1237
1238         db_id = *(uint32_t *)indata.dptr;
1239         ctdb_db = find_ctdb_db(ctdb, db_id);
1240         if (ctdb_db == NULL) {
1241                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1242                                   db_id));
1243                 return -1;
1244         }
1245
1246         if (ctdb->tunable.allow_client_db_attach == 1) {
1247                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1248                                   "Clients are allowed access to databases "
1249                                   "(AllowClientDBAccess == 1)\n",
1250                                   ctdb_db->db_name));
1251                 return -1;
1252         }
1253
1254         if (ctdb_db->persistent) {
1255                 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1256                                   "denied\n", ctdb_db->db_name));
1257                 return -1;
1258         }
1259
1260         /* Cannot detach from database when in recovery */
1261         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1262                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1263                 return -1;
1264         }
1265
1266         /* If a control comes from a client, then broadcast it to all nodes.
1267          * Do the actual detach only if the control comes from other daemons.
1268          */
1269         if (client_id != 0) {
1270                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1271                 if (client != NULL) {
1272                         /* forward the control to all the nodes */
1273                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1274                                                  CTDB_CONTROL_DB_DETACH, 0,
1275                                                  CTDB_CTRL_FLAG_NOREPLY,
1276                                                  indata, NULL, NULL);
1277                         return 0;
1278                 }
1279                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1280                                   "for database '%s'\n", ctdb_db->db_name));
1281                 return -1;
1282         }
1283
1284         /* Detach database from recoverd */
1285         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1286                                      CTDB_SRVID_DETACH_DATABASE,
1287                                      indata) != 0) {
1288                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1289                 return -1;
1290         }
1291
1292         /* Disable vacuuming and drop all vacuuming data */
1293         talloc_free(ctdb_db->vacuum_handle);
1294         talloc_free(ctdb_db->delete_queue);
1295
1296         /* Terminate any deferred fetch */
1297         talloc_free(ctdb_db->deferred_fetch);
1298
1299         /* Terminate any traverses */
1300         while (ctdb_db->traverse) {
1301                 talloc_free(ctdb_db->traverse);
1302         }
1303
1304         /* Terminate any revokes */
1305         while (ctdb_db->revokechild_active) {
1306                 talloc_free(ctdb_db->revokechild_active);
1307         }
1308
1309         /* Free readonly tracking database */
1310         if (ctdb_db->readonly) {
1311                 talloc_free(ctdb_db->rottdb);
1312         }
1313
1314         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1315
1316         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1317                              ctdb_db->db_name));
1318         talloc_free(ctdb_db);
1319
1320         return 0;
1321 }
1322
1323 /*
1324   attach to all existing persistent databases
1325  */
1326 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1327                                   const char *unhealthy_reason)
1328 {
1329         DIR *d;
1330         struct dirent *de;
1331
1332         /* open the persistent db directory and scan it for files */
1333         d = opendir(ctdb->db_directory_persistent);
1334         if (d == NULL) {
1335                 return 0;
1336         }
1337
1338         while ((de=readdir(d))) {
1339                 char *p, *s, *q;
1340                 size_t len = strlen(de->d_name);
1341                 uint32_t node;
1342                 int invalid_name = 0;
1343                 
1344                 s = talloc_strdup(ctdb, de->d_name);
1345                 if (s == NULL) {
1346                         closedir(d);
1347                         CTDB_NO_MEMORY(ctdb, s);
1348                 }
1349
1350                 /* only accept names ending in .tdb */
1351                 p = strstr(s, ".tdb.");
1352                 if (len < 7 || p == NULL) {
1353                         talloc_free(s);
1354                         continue;
1355                 }
1356
1357                 /* only accept names ending with .tdb. and any number of digits */
1358                 q = p+5;
1359                 while (*q != 0 && invalid_name == 0) {
1360                         if (!isdigit(*q++)) {
1361                                 invalid_name = 1;
1362                         }
1363                 }
1364                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1365                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1366                         talloc_free(s);
1367                         continue;
1368                 }
1369                 p[4] = 0;
1370
1371                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1372                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1373                         closedir(d);
1374                         talloc_free(s);
1375                         return -1;
1376                 }
1377
1378                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1379
1380                 talloc_free(s);
1381         }
1382         closedir(d);
1383         return 0;
1384 }
1385
1386 int ctdb_attach_databases(struct ctdb_context *ctdb)
1387 {
1388         int ret;
1389         char *persistent_health_path = NULL;
1390         char *unhealthy_reason = NULL;
1391         bool first_try = true;
1392
1393         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1394                                                  ctdb->db_directory_state,
1395                                                  PERSISTENT_HEALTH_TDB,
1396                                                  ctdb->pnn);
1397         if (persistent_health_path == NULL) {
1398                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1399                 return -1;
1400         }
1401
1402 again:
1403
1404         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1405                                                    0, TDB_DISALLOW_NESTING,
1406                                                    O_CREAT | O_RDWR, 0600);
1407         if (ctdb->db_persistent_health == NULL) {
1408                 struct tdb_wrap *tdb;
1409
1410                 if (!first_try) {
1411                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1412                                           persistent_health_path,
1413                                           errno,
1414                                           strerror(errno)));
1415                         talloc_free(persistent_health_path);
1416                         talloc_free(unhealthy_reason);
1417                         return -1;
1418                 }
1419                 first_try = false;
1420
1421                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1422                                                    persistent_health_path,
1423                                                    "was cleared after a failure",
1424                                                    "manual verification needed");
1425                 if (unhealthy_reason == NULL) {
1426                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1427                         talloc_free(persistent_health_path);
1428                         return -1;
1429                 }
1430
1431                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1432                                   persistent_health_path));
1433                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1434                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1435                                     O_CREAT | O_RDWR, 0600);
1436                 if (tdb) {
1437                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1438                                           persistent_health_path,
1439                                           errno,
1440                                           strerror(errno)));
1441                         talloc_free(persistent_health_path);
1442                         talloc_free(unhealthy_reason);
1443                         return -1;
1444                 }
1445
1446                 talloc_free(tdb);
1447                 goto again;
1448         }
1449         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1450         if (ret != 0) {
1451                 struct tdb_wrap *tdb;
1452
1453                 talloc_free(ctdb->db_persistent_health);
1454                 ctdb->db_persistent_health = NULL;
1455
1456                 if (!first_try) {
1457                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1458                                           persistent_health_path));
1459                         talloc_free(persistent_health_path);
1460                         talloc_free(unhealthy_reason);
1461                         return -1;
1462                 }
1463                 first_try = false;
1464
1465                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1466                                                    persistent_health_path,
1467                                                    "was cleared after a failure",
1468                                                    "manual verification needed");
1469                 if (unhealthy_reason == NULL) {
1470                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1471                         talloc_free(persistent_health_path);
1472                         return -1;
1473                 }
1474
1475                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1476                                   persistent_health_path));
1477                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1478                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1479                                     O_CREAT | O_RDWR, 0600);
1480                 if (tdb) {
1481                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1482                                           persistent_health_path,
1483                                           errno,
1484                                           strerror(errno)));
1485                         talloc_free(persistent_health_path);
1486                         talloc_free(unhealthy_reason);
1487                         return -1;
1488                 }
1489
1490                 talloc_free(tdb);
1491                 goto again;
1492         }
1493         talloc_free(persistent_health_path);
1494
1495         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1496         talloc_free(unhealthy_reason);
1497         if (ret != 0) {
1498                 return ret;
1499         }
1500
1501         return 0;
1502 }
1503
1504 /*
1505   called when a broadcast seqnum update comes in
1506  */
1507 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1508 {
1509         struct ctdb_db_context *ctdb_db;
1510         if (srcnode == ctdb->pnn) {
1511                 /* don't update ourselves! */
1512                 return 0;
1513         }
1514
1515         ctdb_db = find_ctdb_db(ctdb, db_id);
1516         if (!ctdb_db) {
1517                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1518                 return -1;
1519         }
1520
1521         if (ctdb_db->unhealthy_reason) {
1522                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1523                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1524                 return -1;
1525         }
1526
1527         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1528         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1529         return 0;
1530 }
1531
1532 /*
1533   timer to check for seqnum changes in a ltdb and propogate them
1534  */
1535 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1536                                    struct tevent_timer *te,
1537                                    struct timeval t, void *p)
1538 {
1539         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1540         struct ctdb_context *ctdb = ctdb_db->ctdb;
1541         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1542         if (new_seqnum != ctdb_db->seqnum) {
1543                 /* something has changed - propogate it */
1544                 TDB_DATA data;
1545                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1546                 data.dsize = sizeof(uint32_t);
1547                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1548                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1549                                          data, NULL, NULL);             
1550         }
1551         ctdb_db->seqnum = new_seqnum;
1552
1553         /* setup a new timer */
1554         ctdb_db->seqnum_update =
1555                 tevent_add_timer(ctdb->ev, ctdb_db,
1556                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1557                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1558                                  ctdb_ltdb_seqnum_check, ctdb_db);
1559 }
1560
1561 /*
1562   enable seqnum handling on this db
1563  */
1564 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1565 {
1566         struct ctdb_db_context *ctdb_db;
1567         ctdb_db = find_ctdb_db(ctdb, db_id);
1568         if (!ctdb_db) {
1569                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1570                 return -1;
1571         }
1572
1573         if (ctdb_db->seqnum_update == NULL) {
1574                 ctdb_db->seqnum_update = tevent_add_timer(
1575                         ctdb->ev, ctdb_db,
1576                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1577                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1578                         ctdb_ltdb_seqnum_check, ctdb_db);
1579         }
1580
1581         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1582         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1583         return 0;
1584 }
1585
1586 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1587 {
1588         if (ctdb_db->sticky) {
1589                 return 0;
1590         }
1591
1592         if (ctdb_db->persistent) {
1593                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1594                 return -1;
1595         }
1596
1597         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1598
1599         ctdb_db->sticky = true;
1600
1601         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1602
1603         return 0;
1604 }
1605
1606 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1607 {
1608         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1609         int i;
1610
1611         for (i=0; i<MAX_HOT_KEYS; i++) {
1612                 if (s->hot_keys[i].key.dsize > 0) {
1613                         talloc_free(s->hot_keys[i].key.dptr);
1614                 }
1615         }
1616
1617         ZERO_STRUCT(ctdb_db->statistics);
1618 }
1619
1620 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1621                                 uint32_t db_id,
1622                                 TDB_DATA *outdata)
1623 {
1624         struct ctdb_db_context *ctdb_db;
1625         struct ctdb_db_statistics_old *stats;
1626         int i;
1627         int len;
1628         char *ptr;
1629
1630         ctdb_db = find_ctdb_db(ctdb, db_id);
1631         if (!ctdb_db) {
1632                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1633                 return -1;
1634         }
1635
1636         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1637         for (i = 0; i < MAX_HOT_KEYS; i++) {
1638                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1639         }
1640
1641         stats = talloc_size(outdata, len);
1642         if (stats == NULL) {
1643                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1644                 return -1;
1645         }
1646
1647         memcpy(stats, &ctdb_db->statistics,
1648                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1649
1650         stats->num_hot_keys = MAX_HOT_KEYS;
1651
1652         ptr = &stats->hot_keys_wire[0];
1653         for (i = 0; i < MAX_HOT_KEYS; i++) {
1654                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1655                        ctdb_db->statistics.hot_keys[i].key.dsize);
1656                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1657         }
1658
1659         outdata->dptr  = (uint8_t *)stats;
1660         outdata->dsize = len;
1661
1662         return 0;
1663 }