ctdb-daemon: Once database is attached, do not modify tdb flags
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
45
46 /**
47  * write a record to a normal database
48  *
49  * This is the server-variant of the ctdb_ltdb_store function.
50  * It contains logic to determine whether a record should be
51  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52  * controls to the local ctdb daemon if apporpriate.
53  */
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55                                   TDB_DATA key,
56                                   struct ctdb_ltdb_header *header,
57                                   TDB_DATA data)
58 {
59         struct ctdb_context *ctdb = ctdb_db->ctdb;
60         TDB_DATA rec[2];
61         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
62         int ret;
63         bool seqnum_suppressed = false;
64         bool keep = false;
65         bool schedule_for_deletion = false;
66         bool remove_from_delete_queue = false;
67         uint32_t lmaster;
68
69         if (ctdb->flags & CTDB_FLAG_TORTURE) {
70                 TDB_DATA old;
71                 struct ctdb_ltdb_header *h2;
72
73                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74                 h2 = (struct ctdb_ltdb_header *)old.dptr;
75                 if (old.dptr != NULL &&
76                     old.dsize >= hsize &&
77                     h2->rsn > header->rsn) {
78                         DEBUG(DEBUG_ERR,
79                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
80                                h2->rsn, header->rsn));
81                 }
82                 if (old.dptr) {
83                         free(old.dptr);
84                 }
85         }
86
87         if (ctdb->vnn_map == NULL) {
88                 /*
89                  * Called from a client: always store the record
90                  * Also don't call ctdb_lmaster since it uses the vnn_map!
91                  */
92                 keep = true;
93                 goto store;
94         }
95
96         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
97
98         /*
99          * If we migrate an empty record off to another node
100          * and the record has not been migrated with data,
101          * delete the record instead of storing the empty record.
102          */
103         if (data.dsize != 0) {
104                 keep = true;
105         } else if (header->flags & CTDB_REC_RO_FLAGS) {
106                 keep = true;
107         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
108                 /*
109                  * The record is not created by the client but
110                  * automatically by the ctdb_ltdb_fetch logic that
111                  * creates a record with an initial header in the
112                  * ltdb before trying to migrate the record from
113                  * the current lmaster. Keep it instead of trying
114                  * to delete the non-existing record...
115                  */
116                 keep = true;
117                 schedule_for_deletion = true;
118         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
119                 keep = true;
120         } else if (ctdb_db->ctdb->pnn == lmaster) {
121                 /*
122                  * If we are lmaster, then we usually keep the record.
123                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
124                  * and the record is empty and has never been migrated
125                  * with data, then we should delete it instead of storing it.
126                  * This is part of the vacuuming process.
127                  *
128                  * The reason that we usually need to store even empty records
129                  * on the lmaster is that a client operating directly on the
130                  * lmaster (== dmaster) expects the local copy of the record to
131                  * exist after successful ctdb migrate call. If the record does
132                  * not exist, the client goes into a migrate loop and eventually
133                  * fails. So storing the empty record makes sure that we do not
134                  * need to change the client code.
135                  */
136                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
137                         keep = true;
138                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
139                         keep = true;
140                 }
141         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
142                 keep = true;
143         }
144
145         if (keep) {
146                 if (!ctdb_db->persistent &&
147                     (ctdb_db->ctdb->pnn == header->dmaster) &&
148                     !(header->flags & CTDB_REC_RO_FLAGS))
149                 {
150                         header->rsn++;
151
152                         if (data.dsize == 0) {
153                                 schedule_for_deletion = true;
154                         }
155                 }
156                 remove_from_delete_queue = !schedule_for_deletion;
157         }
158
159 store:
160         /*
161          * The VACUUM_MIGRATED flag is only set temporarily for
162          * the above logic when the record was retrieved by a
163          * VACUUM_MIGRATE call and should not be stored in the
164          * database.
165          *
166          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
167          * and there are two cases in which the corresponding record
168          * is stored in the local database:
169          * 1. The record has been migrated with data in the past
170          *    (the MIGRATED_WITH_DATA record flag is set).
171          * 2. The record has been filled with data again since it
172          *    had been submitted in the VACUUM_FETCH message to the
173          *    lmaster.
174          * For such records it is important to not store the
175          * VACUUM_MIGRATED flag in the database.
176          */
177         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
178
179         /*
180          * Similarly, clear the AUTOMATIC flag which should not enter
181          * the local database copy since this would require client
182          * modifications to clear the flag when the client stores
183          * the record.
184          */
185         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
186
187         rec[0].dsize = hsize;
188         rec[0].dptr = (uint8_t *)header;
189
190         rec[1].dsize = data.dsize;
191         rec[1].dptr = data.dptr;
192
193         /* Databases with seqnum updates enabled only get their seqnum
194            changes when/if we modify the data */
195         if (ctdb_db->seqnum_update != NULL) {
196                 TDB_DATA old;
197                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
198
199                 if ((old.dsize == hsize + data.dsize) &&
200                     memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
201                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
202                         seqnum_suppressed = true;
203                 }
204                 if (old.dptr != NULL) {
205                         free(old.dptr);
206                 }
207         }
208
209         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
210                             ctdb_db->db_name,
211                             keep?"storing":"deleting",
212                             ctdb_hash(&key)));
213
214         if (keep) {
215                 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
216         } else {
217                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
218         }
219
220         if (ret != 0) {
221                 int lvl = DEBUG_ERR;
222
223                 if (keep == false &&
224                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
225                 {
226                         lvl = DEBUG_DEBUG;
227                 }
228
229                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
230                             "%d - %s\n",
231                             ctdb_db->db_name,
232                             keep?"store":"delete", ret,
233                             tdb_errorstr(ctdb_db->ltdb->tdb)));
234
235                 schedule_for_deletion = false;
236                 remove_from_delete_queue = false;
237         }
238         if (seqnum_suppressed) {
239                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
240         }
241
242         if (schedule_for_deletion) {
243                 int ret2;
244                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
245                 if (ret2 != 0) {
246                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
247                 }
248         }
249
250         if (remove_from_delete_queue) {
251                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
252         }
253
254         return ret;
255 }
256
257 struct lock_fetch_state {
258         struct ctdb_context *ctdb;
259         struct ctdb_db_context *ctdb_db;
260         void (*recv_pkt)(void *, struct ctdb_req_header *);
261         void *recv_context;
262         struct ctdb_req_header *hdr;
263         uint32_t generation;
264         bool ignore_generation;
265 };
266
267 /*
268   called when we should retry the operation
269  */
270 static void lock_fetch_callback(void *p, bool locked)
271 {
272         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
273         if (!state->ignore_generation &&
274             state->generation != state->ctdb_db->generation) {
275                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
276                 talloc_free(state->hdr);
277                 return;
278         }
279         state->recv_pkt(state->recv_context, state->hdr);
280         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
281 }
282
283
284 /*
285   do a non-blocking ltdb_lock, deferring this ctdb request until we
286   have the chainlock
287
288   It does the following:
289
290    1) tries to get the chainlock. If it succeeds, then it returns 0
291
292    2) if it fails to get a chainlock immediately then it sets up a
293    non-blocking chainlock via ctdb_lock_record, and when it gets the
294    chainlock it re-submits this ctdb request to the main packet
295    receive function.
296
297    This effectively queues all ctdb requests that cannot be
298    immediately satisfied until it can get the lock. This means that
299    the main ctdb daemon will not block waiting for a chainlock held by
300    a client
301
302    There are 3 possible return values:
303
304        0:    means that it got the lock immediately.
305       -1:    means that it failed to get the lock, and won't retry
306       -2:    means that it failed to get the lock immediately, but will retry
307  */
308 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
309                            TDB_DATA key, struct ctdb_req_header *hdr,
310                            void (*recv_pkt)(void *, struct ctdb_req_header *),
311                            void *recv_context, bool ignore_generation)
312 {
313         int ret;
314         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
315         struct lock_request *lreq;
316         struct lock_fetch_state *state;
317         
318         ret = tdb_chainlock_nonblock(tdb, key);
319
320         if (ret != 0 &&
321             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
322                 /* a hard failure - don't try again */
323                 return -1;
324         }
325
326         /* when torturing, ensure we test the contended path */
327         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
328             random() % 5 == 0) {
329                 ret = -1;
330                 tdb_chainunlock(tdb, key);
331         }
332
333         /* first the non-contended path */
334         if (ret == 0) {
335                 return 0;
336         }
337
338         state = talloc(hdr, struct lock_fetch_state);
339         state->ctdb = ctdb_db->ctdb;
340         state->ctdb_db = ctdb_db;
341         state->hdr = hdr;
342         state->recv_pkt = recv_pkt;
343         state->recv_context = recv_context;
344         state->generation = ctdb_db->generation;
345         state->ignore_generation = ignore_generation;
346
347         /* now the contended path */
348         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
349         if (lreq == NULL) {
350                 return -1;
351         }
352
353         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
354            so it won't be freed yet */
355         talloc_steal(state, hdr);
356
357         /* now tell the caller than we will retry asynchronously */
358         return -2;
359 }
360
361 /*
362   a varient of ctdb_ltdb_lock_requeue that also fetches the record
363  */
364 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
365                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
366                                  struct ctdb_req_header *hdr, TDB_DATA *data,
367                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
368                                  void *recv_context, bool ignore_generation)
369 {
370         int ret;
371
372         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
373                                      recv_context, ignore_generation);
374         if (ret == 0) {
375                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
376                 if (ret != 0) {
377                         int uret;
378                         uret = ctdb_ltdb_unlock(ctdb_db, key);
379                         if (uret != 0) {
380                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
381                         }
382                 }
383         }
384         return ret;
385 }
386
387
388 /*
389   paraoid check to see if the db is empty
390  */
391 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
392 {
393         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
394         int count = tdb_traverse_read(tdb, NULL, NULL);
395         if (count != 0) {
396                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
397                          ctdb_db->db_path));
398                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
399         }
400 }
401
402 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
403                                 struct ctdb_db_context *ctdb_db)
404 {
405         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
406         char *old;
407         char *reason = NULL;
408         TDB_DATA key;
409         TDB_DATA val;
410
411         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
412         key.dsize = strlen(ctdb_db->db_name);
413
414         old = ctdb_db->unhealthy_reason;
415         ctdb_db->unhealthy_reason = NULL;
416
417         val = tdb_fetch(tdb, key);
418         if (val.dsize > 0) {
419                 reason = talloc_strndup(ctdb_db,
420                                         (const char *)val.dptr,
421                                         val.dsize);
422                 if (reason == NULL) {
423                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
424                                            (int)val.dsize));
425                         ctdb_db->unhealthy_reason = old;
426                         free(val.dptr);
427                         return -1;
428                 }
429         }
430
431         if (val.dptr) {
432                 free(val.dptr);
433         }
434
435         talloc_free(old);
436         ctdb_db->unhealthy_reason = reason;
437         return 0;
438 }
439
440 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
441                                   struct ctdb_db_context *ctdb_db,
442                                   const char *given_reason,/* NULL means healthy */
443                                   int num_healthy_nodes)
444 {
445         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
446         int ret;
447         TDB_DATA key;
448         TDB_DATA val;
449         char *new_reason = NULL;
450         char *old_reason = NULL;
451
452         ret = tdb_transaction_start(tdb);
453         if (ret != 0) {
454                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
455                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
456                 return -1;
457         }
458
459         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
460         if (ret != 0) {
461                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
462                                    ctdb_db->db_name, ret));
463                 return -1;
464         }
465         old_reason = ctdb_db->unhealthy_reason;
466
467         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
468         key.dsize = strlen(ctdb_db->db_name);
469
470         if (given_reason) {
471                 new_reason = talloc_strdup(ctdb_db, given_reason);
472                 if (new_reason == NULL) {
473                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
474                                           given_reason));
475                         return -1;
476                 }
477         } else if (old_reason && num_healthy_nodes == 0) {
478                 /*
479                  * If the reason indicates ok, but there where no healthy nodes
480                  * available, that it means, we have not recovered valid content
481                  * of the db. So if there's an old reason, prefix it with
482                  * "NO-HEALTHY-NODES - "
483                  */
484                 const char *prefix;
485
486 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
487                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
488                 if (ret != 0) {
489                         prefix = _TMP_PREFIX;
490                 } else {
491                         prefix = "";
492                 }
493                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
494                                          prefix, old_reason);
495                 if (new_reason == NULL) {
496                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
497                                           prefix, old_reason));
498                         return -1;
499                 }
500 #undef _TMP_PREFIX
501         }
502
503         if (new_reason) {
504                 val.dptr = discard_const_p(uint8_t, new_reason);
505                 val.dsize = strlen(new_reason);
506
507                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
508                 if (ret != 0) {
509                         tdb_transaction_cancel(tdb);
510                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
511                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
512                                            ret, tdb_errorstr(tdb)));
513                         talloc_free(new_reason);
514                         return -1;
515                 }
516                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
517                                    ctdb_db->db_name, new_reason));
518         } else if (old_reason) {
519                 ret = tdb_delete(tdb, key);
520                 if (ret != 0) {
521                         tdb_transaction_cancel(tdb);
522                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
523                                            tdb_name(tdb), ctdb_db->db_name,
524                                            ret, tdb_errorstr(tdb)));
525                         talloc_free(new_reason);
526                         return -1;
527                 }
528                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
529                                    ctdb_db->db_name));
530         }
531
532         ret = tdb_transaction_commit(tdb);
533         if (ret != TDB_SUCCESS) {
534                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
535                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
536                 talloc_free(new_reason);
537                 return -1;
538         }
539
540         talloc_free(old_reason);
541         ctdb_db->unhealthy_reason = new_reason;
542
543         return 0;
544 }
545
546 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
547                                      struct ctdb_db_context *ctdb_db)
548 {
549         time_t now = time(NULL);
550         char *new_path;
551         char *new_reason;
552         int ret;
553         struct tm *tm;
554
555         tm = gmtime(&now);
556
557         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
558         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
559                                    "%04u%02u%02u%02u%02u%02u.0Z",
560                                    ctdb_db->db_path,
561                                    tm->tm_year+1900, tm->tm_mon+1,
562                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
563                                    tm->tm_sec);
564         if (new_path == NULL) {
565                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
566                 return -1;
567         }
568
569         new_reason = talloc_asprintf(ctdb_db,
570                                      "ERROR - Backup of corrupted TDB in '%s'",
571                                      new_path);
572         if (new_reason == NULL) {
573                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
574                 return -1;
575         }
576         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
577         talloc_free(new_reason);
578         if (ret != 0) {
579                 DEBUG(DEBUG_CRIT,(__location__
580                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
581                                  ctdb_db->db_path));
582                 return -1;
583         }
584
585         ret = rename(ctdb_db->db_path, new_path);
586         if (ret != 0) {
587                 DEBUG(DEBUG_CRIT,(__location__
588                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
589                                   ctdb_db->db_path, new_path,
590                                   errno, strerror(errno)));
591                 talloc_free(new_path);
592                 return -1;
593         }
594
595         DEBUG(DEBUG_CRIT,(__location__
596                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
597                          ctdb_db->db_path, new_path));
598         talloc_free(new_path);
599         return 0;
600 }
601
602 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
603 {
604         struct ctdb_db_context *ctdb_db;
605         int ret;
606         int ok = 0;
607         int fail = 0;
608
609         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
610                 if (!ctdb_db->persistent) {
611                         continue;
612                 }
613
614                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
615                 if (ret != 0) {
616                         DEBUG(DEBUG_ALERT,(__location__
617                                            " load persistent health for '%s' failed\n",
618                                            ctdb_db->db_path));
619                         return -1;
620                 }
621
622                 if (ctdb_db->unhealthy_reason == NULL) {
623                         ok++;
624                         DEBUG(DEBUG_INFO,(__location__
625                                    " persistent db '%s' healthy\n",
626                                    ctdb_db->db_path));
627                         continue;
628                 }
629
630                 fail++;
631                 DEBUG(DEBUG_ALERT,(__location__
632                                    " persistent db '%s' unhealthy: %s\n",
633                                    ctdb_db->db_path,
634                                    ctdb_db->unhealthy_reason));
635         }
636         DEBUG(DEBUG_NOTICE,
637               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
638                ok, fail));
639
640         if (fail != 0) {
641                 return -1;
642         }
643
644         return 0;
645 }
646
647
648 /*
649   mark a database - as healthy
650  */
651 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
652 {
653         uint32_t db_id = *(uint32_t *)indata.dptr;
654         struct ctdb_db_context *ctdb_db;
655         int ret;
656         bool may_recover = false;
657
658         ctdb_db = find_ctdb_db(ctdb, db_id);
659         if (!ctdb_db) {
660                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
661                 return -1;
662         }
663
664         if (ctdb_db->unhealthy_reason) {
665                 may_recover = true;
666         }
667
668         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
669         if (ret != 0) {
670                 DEBUG(DEBUG_ERR,(__location__
671                                  " ctdb_update_persistent_health(%s) failed\n",
672                                  ctdb_db->db_name));
673                 return -1;
674         }
675
676         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
677                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
678                                   ctdb_db->db_name));
679                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
680         }
681
682         return 0;
683 }
684
685 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
686                                    TDB_DATA indata,
687                                    TDB_DATA *outdata)
688 {
689         uint32_t db_id = *(uint32_t *)indata.dptr;
690         struct ctdb_db_context *ctdb_db;
691         int ret;
692
693         ctdb_db = find_ctdb_db(ctdb, db_id);
694         if (!ctdb_db) {
695                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
696                 return -1;
697         }
698
699         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
700         if (ret != 0) {
701                 DEBUG(DEBUG_ERR,(__location__
702                                  " ctdb_load_persistent_health(%s) failed\n",
703                                  ctdb_db->db_name));
704                 return -1;
705         }
706
707         *outdata = tdb_null;
708         if (ctdb_db->unhealthy_reason) {
709                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
710                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
711         }
712
713         return 0;
714 }
715
716
717 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
718 {
719         char *ropath;
720
721         if (ctdb_db->readonly) {
722                 return 0;
723         }
724
725         if (ctdb_db->persistent) {
726                 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
727                 return -1;
728         }
729
730         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
731         if (ropath == NULL) {
732                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
733                 return -1;
734         }
735         ctdb_db->rottdb = tdb_open(ropath, 
736                               ctdb->tunable.database_hash_size, 
737                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
738                               O_CREAT|O_RDWR, 0600);
739         if (ctdb_db->rottdb == NULL) {
740                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
741                 talloc_free(ropath);
742                 return -1;
743         }
744
745         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
746
747         ctdb_db->readonly = true;
748
749         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
750
751         talloc_free(ropath);
752         return 0;
753 }
754
755 /*
756   attach to a database, handling both persistent and non-persistent databases
757   return 0 on success, -1 on failure
758  */
759 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
760                              bool persistent, const char *unhealthy_reason,
761                              bool jenkinshash, bool mutexes)
762 {
763         struct ctdb_db_context *ctdb_db, *tmp_db;
764         int ret;
765         struct TDB_DATA key;
766         unsigned tdb_flags;
767         int mode = 0600;
768         int remaining_tries = 0;
769
770         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
771         CTDB_NO_MEMORY(ctdb, ctdb_db);
772
773         ctdb_db->ctdb = ctdb;
774         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
775         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
776
777         key.dsize = strlen(db_name)+1;
778         key.dptr  = discard_const(db_name);
779         ctdb_db->db_id = ctdb_hash(&key);
780         ctdb_db->persistent = persistent;
781
782         if (!ctdb_db->persistent) {
783                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
784                 if (ctdb_db->delete_queue == NULL) {
785                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
786                 }
787
788                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
789         }
790
791         /* check for hash collisions */
792         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
793                 if (tmp_db->db_id == ctdb_db->db_id) {
794                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
795                                  tmp_db->db_id, db_name, tmp_db->db_name));
796                         talloc_free(ctdb_db);
797                         return -1;
798                 }
799         }
800
801         if (persistent) {
802                 if (unhealthy_reason) {
803                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
804                                                             unhealthy_reason, 0);
805                         if (ret != 0) {
806                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
807                                                    ctdb_db->db_name, unhealthy_reason, ret));
808                                 talloc_free(ctdb_db);
809                                 return -1;
810                         }
811                 }
812
813                 if (ctdb->max_persistent_check_errors > 0) {
814                         remaining_tries = 1;
815                 }
816                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
817                         remaining_tries = 0;
818                 }
819
820                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
821                 if (ret != 0) {
822                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
823                                    ctdb_db->db_name, ret));
824                         talloc_free(ctdb_db);
825                         return -1;
826                 }
827         }
828
829         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
830                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
831                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
832                 talloc_free(ctdb_db);
833                 return -1;
834         }
835
836         if (ctdb_db->unhealthy_reason) {
837                 /* this is just a warning, but we want that in the log file! */
838                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
839                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
840         }
841
842         /* open the database */
843         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
844                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
845                                            db_name, ctdb->pnn);
846
847         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
848         if (ctdb->valgrinding) {
849                 tdb_flags |= TDB_NOMMAP;
850         }
851         tdb_flags |= TDB_DISALLOW_NESTING;
852         if (jenkinshash) {
853                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
854         }
855 #ifdef TDB_MUTEX_LOCKING
856         if (ctdb->tunable.mutex_enabled && mutexes &&
857             tdb_runtime_check_for_robust_mutexes()) {
858                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
859         }
860 #endif
861
862 again:
863         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
864                                       ctdb->tunable.database_hash_size, 
865                                       tdb_flags, 
866                                       O_CREAT|O_RDWR, mode);
867         if (ctdb_db->ltdb == NULL) {
868                 struct stat st;
869                 int saved_errno = errno;
870
871                 if (!persistent) {
872                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
873                                           ctdb_db->db_path,
874                                           saved_errno,
875                                           strerror(saved_errno)));
876                         talloc_free(ctdb_db);
877                         return -1;
878                 }
879
880                 if (remaining_tries == 0) {
881                         DEBUG(DEBUG_CRIT,(__location__
882                                           "Failed to open persistent tdb '%s': %d - %s\n",
883                                           ctdb_db->db_path,
884                                           saved_errno,
885                                           strerror(saved_errno)));
886                         talloc_free(ctdb_db);
887                         return -1;
888                 }
889
890                 ret = stat(ctdb_db->db_path, &st);
891                 if (ret != 0) {
892                         DEBUG(DEBUG_CRIT,(__location__
893                                           "Failed to open persistent tdb '%s': %d - %s\n",
894                                           ctdb_db->db_path,
895                                           saved_errno,
896                                           strerror(saved_errno)));
897                         talloc_free(ctdb_db);
898                         return -1;
899                 }
900
901                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
902                 if (ret != 0) {
903                         DEBUG(DEBUG_CRIT,(__location__
904                                           "Failed to open persistent tdb '%s': %d - %s\n",
905                                           ctdb_db->db_path,
906                                           saved_errno,
907                                           strerror(saved_errno)));
908                         talloc_free(ctdb_db);
909                         return -1;
910                 }
911
912                 remaining_tries--;
913                 mode = st.st_mode;
914                 goto again;
915         }
916
917         if (!persistent) {
918                 ctdb_check_db_empty(ctdb_db);
919         } else {
920                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
921                 if (ret != 0) {
922                         int fd;
923                         struct stat st;
924
925                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
926                                           ctdb_db->db_path, ret,
927                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
928                         if (remaining_tries == 0) {
929                                 talloc_free(ctdb_db);
930                                 return -1;
931                         }
932
933                         fd = tdb_fd(ctdb_db->ltdb->tdb);
934                         ret = fstat(fd, &st);
935                         if (ret != 0) {
936                                 DEBUG(DEBUG_CRIT,(__location__
937                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
938                                                   ctdb_db->db_path,
939                                                   errno,
940                                                   strerror(errno)));
941                                 talloc_free(ctdb_db);
942                                 return -1;
943                         }
944
945                         /* close the TDB */
946                         talloc_free(ctdb_db->ltdb);
947                         ctdb_db->ltdb = NULL;
948
949                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
950                         if (ret != 0) {
951                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
952                                                   ctdb_db->db_path));
953                                 talloc_free(ctdb_db);
954                                 return -1;
955                         }
956
957                         remaining_tries--;
958                         mode = st.st_mode;
959                         goto again;
960                 }
961         }
962
963         /* set up a rb tree we can use to track which records we have a 
964            fetch-lock in-flight for so we can defer any additional calls
965            for the same record.
966          */
967         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
968         if (ctdb_db->deferred_fetch == NULL) {
969                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
970                 talloc_free(ctdb_db);
971                 return -1;
972         }
973
974         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
975         if (ctdb_db->defer_dmaster == NULL) {
976                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
977                                   ctdb_db->db_name));
978                 talloc_free(ctdb_db);
979                 return -1;
980         }
981
982         DLIST_ADD(ctdb->db_list, ctdb_db);
983
984         /* setting this can help some high churn databases */
985         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
986
987         /* 
988            all databases support the "null" function. we need this in
989            order to do forced migration of records
990         */
991         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
992         if (ret != 0) {
993                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
994                 talloc_free(ctdb_db);
995                 return -1;
996         }
997
998         /* 
999            all databases support the "fetch" function. we need this
1000            for efficient Samba3 ctdb fetch
1001         */
1002         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1003         if (ret != 0) {
1004                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1005                 talloc_free(ctdb_db);
1006                 return -1;
1007         }
1008
1009         /* 
1010            all databases support the "fetch_with_header" function. we need this
1011            for efficient readonly record fetches
1012         */
1013         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1014         if (ret != 0) {
1015                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1016                 talloc_free(ctdb_db);
1017                 return -1;
1018         }
1019
1020         ret = ctdb_vacuum_init(ctdb_db);
1021         if (ret != 0) {
1022                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1023                                   "database '%s'\n", ctdb_db->db_name));
1024                 talloc_free(ctdb_db);
1025                 return -1;
1026         }
1027
1028         ret = ctdb_migration_init(ctdb_db);
1029         if (ret != 0) {
1030                 DEBUG(DEBUG_ERR,
1031                       ("Failed to setup migration tracking for db '%s'\n",
1032                        ctdb_db->db_name));
1033                 talloc_free(ctdb_db);
1034                 return -1;
1035         }
1036
1037         ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1038                            &ctdb_db->lock_log);
1039         if (ret != 0) {
1040                 DEBUG(DEBUG_ERR,
1041                       ("Failed to setup lock logging for db '%s'\n",
1042                        ctdb_db->db_name));
1043                 talloc_free(ctdb_db);
1044                 return -1;
1045         }
1046
1047         ctdb_db->generation = ctdb->vnn_map->generation;
1048
1049         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1050                             ctdb_db->db_path, tdb_flags));
1051
1052         /* success */
1053         return 0;
1054 }
1055
1056
1057 struct ctdb_deferred_attach_context {
1058         struct ctdb_deferred_attach_context *next, *prev;
1059         struct ctdb_context *ctdb;
1060         struct ctdb_req_control_old *c;
1061 };
1062
1063
1064 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1065 {
1066         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1067
1068         return 0;
1069 }
1070
1071 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1072                                          struct tevent_timer *te,
1073                                          struct timeval t, void *private_data)
1074 {
1075         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1076         struct ctdb_context *ctdb = da_ctx->ctdb;
1077
1078         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1079         talloc_free(da_ctx);
1080 }
1081
1082 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1083                                           struct tevent_timer *te,
1084                                           struct timeval t, void *private_data)
1085 {
1086         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1087         struct ctdb_context *ctdb = da_ctx->ctdb;
1088
1089         /* This talloc-steals the packet ->c */
1090         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1091         talloc_free(da_ctx);
1092 }
1093
1094 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1095 {
1096         struct ctdb_deferred_attach_context *da_ctx;
1097
1098         /* call it from the main event loop as soon as the current event 
1099            finishes.
1100          */
1101         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1102                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1103                 tevent_add_timer(ctdb->ev, da_ctx,
1104                                  timeval_current_ofs(1,0),
1105                                  ctdb_deferred_attach_callback, da_ctx);
1106         }
1107
1108         return 0;
1109 }
1110
1111 /*
1112   a client has asked to attach a new database
1113  */
1114 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1115                                TDB_DATA *outdata, uint64_t tdb_flags, 
1116                                bool persistent, uint32_t client_id,
1117                                struct ctdb_req_control_old *c,
1118                                bool *async_reply)
1119 {
1120         const char *db_name = (const char *)indata.dptr;
1121         struct ctdb_db_context *db;
1122         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1123         struct ctdb_client *client = NULL;
1124         bool with_jenkinshash, with_mutexes;
1125
1126         if (ctdb->tunable.allow_client_db_attach == 0) {
1127                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1128                                   "AllowClientDBAccess == 0\n", db_name));
1129                 return -1;
1130         }
1131
1132         /* don't allow any local clients to attach while we are in recovery mode
1133          * except for the recovery daemon.
1134          * allow all attach from the network since these are always from remote
1135          * recovery daemons.
1136          */
1137         if (client_id != 0) {
1138                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1139         }
1140         if (client != NULL) {
1141                 /* If the node is inactive it is not part of the cluster
1142                    and we should not allow clients to attach to any
1143                    databases
1144                 */
1145                 if (node->flags & NODE_FLAGS_INACTIVE) {
1146                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1147                         return -1;
1148                 }
1149
1150                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1151                     client->pid != ctdb->recoverd_pid &&
1152                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1153                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1154
1155                         if (da_ctx == NULL) {
1156                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1157                                 return -1;
1158                         }
1159
1160                         da_ctx->ctdb = ctdb;
1161                         da_ctx->c = talloc_steal(da_ctx, c);
1162                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1163                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1164
1165                         tevent_add_timer(ctdb->ev, da_ctx,
1166                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1167                                          ctdb_deferred_attach_timeout, da_ctx);
1168
1169                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1170                         *async_reply = true;
1171                         return 0;
1172                 }
1173         }
1174
1175         /* the client can optionally pass additional tdb flags, but we
1176            only allow a subset of those on the database in ctdb. Note
1177            that tdb_flags is passed in via the (otherwise unused)
1178            srvid to the attach control */
1179 #ifdef TDB_MUTEX_LOCKING
1180         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1181 #else
1182         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1183 #endif
1184
1185         /* see if we already have this name */
1186         db = ctdb_db_handle(ctdb, db_name);
1187         if (db) {
1188                 if (db->persistent != persistent) {
1189                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1190                                           "database %s\n", persistent ? "" : "non-",
1191                                           db-> persistent ? "" : "non-", db_name));
1192                         return -1;
1193                 }
1194                 outdata->dptr  = (uint8_t *)&db->db_id;
1195                 outdata->dsize = sizeof(db->db_id);
1196                 return 0;
1197         }
1198
1199         with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1200 #ifdef TDB_MUTEX_LOCKING
1201         with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1202 #else
1203         with_mutexes = false;
1204 #endif
1205
1206         if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1207                               with_jenkinshash, with_mutexes) != 0) {
1208                 return -1;
1209         }
1210
1211         db = ctdb_db_handle(ctdb, db_name);
1212         if (!db) {
1213                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1214                 return -1;
1215         }
1216
1217         /* remember the flags the client has specified */
1218         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1219
1220         outdata->dptr  = (uint8_t *)&db->db_id;
1221         outdata->dsize = sizeof(db->db_id);
1222
1223         /* Try to ensure it's locked in mem */
1224         lockdown_memory(ctdb->valgrinding);
1225
1226         /* tell all the other nodes about this database */
1227         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1228                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1229                                                 CTDB_CONTROL_DB_ATTACH,
1230                                  0, CTDB_CTRL_FLAG_NOREPLY,
1231                                  indata, NULL, NULL);
1232
1233         /* success */
1234         return 0;
1235 }
1236
1237 /*
1238  * a client has asked to detach from a database
1239  */
1240 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1241                                uint32_t client_id)
1242 {
1243         uint32_t db_id;
1244         struct ctdb_db_context *ctdb_db;
1245         struct ctdb_client *client = NULL;
1246
1247         db_id = *(uint32_t *)indata.dptr;
1248         ctdb_db = find_ctdb_db(ctdb, db_id);
1249         if (ctdb_db == NULL) {
1250                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1251                                   db_id));
1252                 return -1;
1253         }
1254
1255         if (ctdb->tunable.allow_client_db_attach == 1) {
1256                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1257                                   "Clients are allowed access to databases "
1258                                   "(AllowClientDBAccess == 1)\n",
1259                                   ctdb_db->db_name));
1260                 return -1;
1261         }
1262
1263         if (ctdb_db->persistent) {
1264                 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1265                                   "denied\n", ctdb_db->db_name));
1266                 return -1;
1267         }
1268
1269         /* Cannot detach from database when in recovery */
1270         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1271                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1272                 return -1;
1273         }
1274
1275         /* If a control comes from a client, then broadcast it to all nodes.
1276          * Do the actual detach only if the control comes from other daemons.
1277          */
1278         if (client_id != 0) {
1279                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1280                 if (client != NULL) {
1281                         /* forward the control to all the nodes */
1282                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1283                                                  CTDB_CONTROL_DB_DETACH, 0,
1284                                                  CTDB_CTRL_FLAG_NOREPLY,
1285                                                  indata, NULL, NULL);
1286                         return 0;
1287                 }
1288                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1289                                   "for database '%s'\n", ctdb_db->db_name));
1290                 return -1;
1291         }
1292
1293         /* Detach database from recoverd */
1294         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1295                                      CTDB_SRVID_DETACH_DATABASE,
1296                                      indata) != 0) {
1297                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1298                 return -1;
1299         }
1300
1301         /* Disable vacuuming and drop all vacuuming data */
1302         talloc_free(ctdb_db->vacuum_handle);
1303         talloc_free(ctdb_db->delete_queue);
1304
1305         /* Terminate any deferred fetch */
1306         talloc_free(ctdb_db->deferred_fetch);
1307
1308         /* Terminate any traverses */
1309         while (ctdb_db->traverse) {
1310                 talloc_free(ctdb_db->traverse);
1311         }
1312
1313         /* Terminate any revokes */
1314         while (ctdb_db->revokechild_active) {
1315                 talloc_free(ctdb_db->revokechild_active);
1316         }
1317
1318         /* Free readonly tracking database */
1319         if (ctdb_db->readonly) {
1320                 talloc_free(ctdb_db->rottdb);
1321         }
1322
1323         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1324
1325         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1326                              ctdb_db->db_name));
1327         talloc_free(ctdb_db);
1328
1329         return 0;
1330 }
1331
1332 /*
1333   attach to all existing persistent databases
1334  */
1335 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1336                                   const char *unhealthy_reason)
1337 {
1338         DIR *d;
1339         struct dirent *de;
1340
1341         /* open the persistent db directory and scan it for files */
1342         d = opendir(ctdb->db_directory_persistent);
1343         if (d == NULL) {
1344                 return 0;
1345         }
1346
1347         while ((de=readdir(d))) {
1348                 char *p, *s, *q;
1349                 size_t len = strlen(de->d_name);
1350                 uint32_t node;
1351                 int invalid_name = 0;
1352                 
1353                 s = talloc_strdup(ctdb, de->d_name);
1354                 if (s == NULL) {
1355                         closedir(d);
1356                         CTDB_NO_MEMORY(ctdb, s);
1357                 }
1358
1359                 /* only accept names ending in .tdb */
1360                 p = strstr(s, ".tdb.");
1361                 if (len < 7 || p == NULL) {
1362                         talloc_free(s);
1363                         continue;
1364                 }
1365
1366                 /* only accept names ending with .tdb. and any number of digits */
1367                 q = p+5;
1368                 while (*q != 0 && invalid_name == 0) {
1369                         if (!isdigit(*q++)) {
1370                                 invalid_name = 1;
1371                         }
1372                 }
1373                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1374                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1375                         talloc_free(s);
1376                         continue;
1377                 }
1378                 p[4] = 0;
1379
1380                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1381                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1382                         closedir(d);
1383                         talloc_free(s);
1384                         return -1;
1385                 }
1386
1387                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1388
1389                 talloc_free(s);
1390         }
1391         closedir(d);
1392         return 0;
1393 }
1394
1395 int ctdb_attach_databases(struct ctdb_context *ctdb)
1396 {
1397         int ret;
1398         char *persistent_health_path = NULL;
1399         char *unhealthy_reason = NULL;
1400         bool first_try = true;
1401
1402         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1403                                                  ctdb->db_directory_state,
1404                                                  PERSISTENT_HEALTH_TDB,
1405                                                  ctdb->pnn);
1406         if (persistent_health_path == NULL) {
1407                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1408                 return -1;
1409         }
1410
1411 again:
1412
1413         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1414                                                    0, TDB_DISALLOW_NESTING,
1415                                                    O_CREAT | O_RDWR, 0600);
1416         if (ctdb->db_persistent_health == NULL) {
1417                 struct tdb_wrap *tdb;
1418
1419                 if (!first_try) {
1420                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1421                                           persistent_health_path,
1422                                           errno,
1423                                           strerror(errno)));
1424                         talloc_free(persistent_health_path);
1425                         talloc_free(unhealthy_reason);
1426                         return -1;
1427                 }
1428                 first_try = false;
1429
1430                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1431                                                    persistent_health_path,
1432                                                    "was cleared after a failure",
1433                                                    "manual verification needed");
1434                 if (unhealthy_reason == NULL) {
1435                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1436                         talloc_free(persistent_health_path);
1437                         return -1;
1438                 }
1439
1440                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1441                                   persistent_health_path));
1442                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1443                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1444                                     O_CREAT | O_RDWR, 0600);
1445                 if (tdb) {
1446                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1447                                           persistent_health_path,
1448                                           errno,
1449                                           strerror(errno)));
1450                         talloc_free(persistent_health_path);
1451                         talloc_free(unhealthy_reason);
1452                         return -1;
1453                 }
1454
1455                 talloc_free(tdb);
1456                 goto again;
1457         }
1458         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1459         if (ret != 0) {
1460                 struct tdb_wrap *tdb;
1461
1462                 talloc_free(ctdb->db_persistent_health);
1463                 ctdb->db_persistent_health = NULL;
1464
1465                 if (!first_try) {
1466                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1467                                           persistent_health_path));
1468                         talloc_free(persistent_health_path);
1469                         talloc_free(unhealthy_reason);
1470                         return -1;
1471                 }
1472                 first_try = false;
1473
1474                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1475                                                    persistent_health_path,
1476                                                    "was cleared after a failure",
1477                                                    "manual verification needed");
1478                 if (unhealthy_reason == NULL) {
1479                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1480                         talloc_free(persistent_health_path);
1481                         return -1;
1482                 }
1483
1484                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1485                                   persistent_health_path));
1486                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1487                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1488                                     O_CREAT | O_RDWR, 0600);
1489                 if (tdb) {
1490                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1491                                           persistent_health_path,
1492                                           errno,
1493                                           strerror(errno)));
1494                         talloc_free(persistent_health_path);
1495                         talloc_free(unhealthy_reason);
1496                         return -1;
1497                 }
1498
1499                 talloc_free(tdb);
1500                 goto again;
1501         }
1502         talloc_free(persistent_health_path);
1503
1504         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1505         talloc_free(unhealthy_reason);
1506         if (ret != 0) {
1507                 return ret;
1508         }
1509
1510         return 0;
1511 }
1512
1513 /*
1514   called when a broadcast seqnum update comes in
1515  */
1516 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1517 {
1518         struct ctdb_db_context *ctdb_db;
1519         if (srcnode == ctdb->pnn) {
1520                 /* don't update ourselves! */
1521                 return 0;
1522         }
1523
1524         ctdb_db = find_ctdb_db(ctdb, db_id);
1525         if (!ctdb_db) {
1526                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1527                 return -1;
1528         }
1529
1530         if (ctdb_db->unhealthy_reason) {
1531                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1532                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1533                 return -1;
1534         }
1535
1536         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1537         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1538         return 0;
1539 }
1540
1541 /*
1542   timer to check for seqnum changes in a ltdb and propogate them
1543  */
1544 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1545                                    struct tevent_timer *te,
1546                                    struct timeval t, void *p)
1547 {
1548         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1549         struct ctdb_context *ctdb = ctdb_db->ctdb;
1550         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1551         if (new_seqnum != ctdb_db->seqnum) {
1552                 /* something has changed - propogate it */
1553                 TDB_DATA data;
1554                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1555                 data.dsize = sizeof(uint32_t);
1556                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1557                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1558                                          data, NULL, NULL);             
1559         }
1560         ctdb_db->seqnum = new_seqnum;
1561
1562         /* setup a new timer */
1563         ctdb_db->seqnum_update =
1564                 tevent_add_timer(ctdb->ev, ctdb_db,
1565                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1566                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1567                                  ctdb_ltdb_seqnum_check, ctdb_db);
1568 }
1569
1570 /*
1571   enable seqnum handling on this db
1572  */
1573 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1574 {
1575         struct ctdb_db_context *ctdb_db;
1576         ctdb_db = find_ctdb_db(ctdb, db_id);
1577         if (!ctdb_db) {
1578                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1579                 return -1;
1580         }
1581
1582         if (ctdb_db->seqnum_update == NULL) {
1583                 ctdb_db->seqnum_update = tevent_add_timer(
1584                         ctdb->ev, ctdb_db,
1585                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1586                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1587                         ctdb_ltdb_seqnum_check, ctdb_db);
1588         }
1589
1590         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1591         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1592         return 0;
1593 }
1594
1595 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1596 {
1597         if (ctdb_db->sticky) {
1598                 return 0;
1599         }
1600
1601         if (ctdb_db->persistent) {
1602                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1603                 return -1;
1604         }
1605
1606         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1607
1608         ctdb_db->sticky = true;
1609
1610         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1611
1612         return 0;
1613 }
1614
1615 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1616 {
1617         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1618         int i;
1619
1620         for (i=0; i<MAX_HOT_KEYS; i++) {
1621                 if (s->hot_keys[i].key.dsize > 0) {
1622                         talloc_free(s->hot_keys[i].key.dptr);
1623                 }
1624         }
1625
1626         ZERO_STRUCT(ctdb_db->statistics);
1627 }
1628
1629 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1630                                 uint32_t db_id,
1631                                 TDB_DATA *outdata)
1632 {
1633         struct ctdb_db_context *ctdb_db;
1634         struct ctdb_db_statistics_old *stats;
1635         int i;
1636         int len;
1637         char *ptr;
1638
1639         ctdb_db = find_ctdb_db(ctdb, db_id);
1640         if (!ctdb_db) {
1641                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1642                 return -1;
1643         }
1644
1645         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1646         for (i = 0; i < MAX_HOT_KEYS; i++) {
1647                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1648         }
1649
1650         stats = talloc_size(outdata, len);
1651         if (stats == NULL) {
1652                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1653                 return -1;
1654         }
1655
1656         memcpy(stats, &ctdb_db->statistics,
1657                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1658
1659         stats->num_hot_keys = MAX_HOT_KEYS;
1660
1661         ptr = &stats->hot_keys_wire[0];
1662         for (i = 0; i < MAX_HOT_KEYS; i++) {
1663                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1664                        ctdb_db->statistics.hot_keys[i].key.dsize);
1665                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1666         }
1667
1668         outdata->dptr  = (uint8_t *)stats;
1669         outdata->dsize = len;
1670
1671         return 0;
1672 }