ctdb-daemon: Add tracking of migration records
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
45
46 /**
47  * write a record to a normal database
48  *
49  * This is the server-variant of the ctdb_ltdb_store function.
50  * It contains logic to determine whether a record should be
51  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52  * controls to the local ctdb daemon if apporpriate.
53  */
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55                                   TDB_DATA key,
56                                   struct ctdb_ltdb_header *header,
57                                   TDB_DATA data)
58 {
59         struct ctdb_context *ctdb = ctdb_db->ctdb;
60         TDB_DATA rec[2];
61         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
62         int ret;
63         bool seqnum_suppressed = false;
64         bool keep = false;
65         bool schedule_for_deletion = false;
66         bool remove_from_delete_queue = false;
67         uint32_t lmaster;
68
69         if (ctdb->flags & CTDB_FLAG_TORTURE) {
70                 TDB_DATA old;
71                 struct ctdb_ltdb_header *h2;
72
73                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74                 h2 = (struct ctdb_ltdb_header *)old.dptr;
75                 if (old.dptr != NULL &&
76                     old.dsize >= hsize &&
77                     h2->rsn > header->rsn) {
78                         DEBUG(DEBUG_ERR,
79                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
80                                h2->rsn, header->rsn));
81                 }
82                 if (old.dptr) {
83                         free(old.dptr);
84                 }
85         }
86
87         if (ctdb->vnn_map == NULL) {
88                 /*
89                  * Called from a client: always store the record
90                  * Also don't call ctdb_lmaster since it uses the vnn_map!
91                  */
92                 keep = true;
93                 goto store;
94         }
95
96         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
97
98         /*
99          * If we migrate an empty record off to another node
100          * and the record has not been migrated with data,
101          * delete the record instead of storing the empty record.
102          */
103         if (data.dsize != 0) {
104                 keep = true;
105         } else if (header->flags & CTDB_REC_RO_FLAGS) {
106                 keep = true;
107         } else if (ctdb_db->persistent) {
108                 keep = true;
109         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
110                 /*
111                  * The record is not created by the client but
112                  * automatically by the ctdb_ltdb_fetch logic that
113                  * creates a record with an initial header in the
114                  * ltdb before trying to migrate the record from
115                  * the current lmaster. Keep it instead of trying
116                  * to delete the non-existing record...
117                  */
118                 keep = true;
119                 schedule_for_deletion = true;
120         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
121                 keep = true;
122         } else if (ctdb_db->ctdb->pnn == lmaster) {
123                 /*
124                  * If we are lmaster, then we usually keep the record.
125                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
126                  * and the record is empty and has never been migrated
127                  * with data, then we should delete it instead of storing it.
128                  * This is part of the vacuuming process.
129                  *
130                  * The reason that we usually need to store even empty records
131                  * on the lmaster is that a client operating directly on the
132                  * lmaster (== dmaster) expects the local copy of the record to
133                  * exist after successful ctdb migrate call. If the record does
134                  * not exist, the client goes into a migrate loop and eventually
135                  * fails. So storing the empty record makes sure that we do not
136                  * need to change the client code.
137                  */
138                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
139                         keep = true;
140                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
141                         keep = true;
142                 }
143         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
144                 keep = true;
145         }
146
147         if (keep) {
148                 if (!ctdb_db->persistent &&
149                     (ctdb_db->ctdb->pnn == header->dmaster) &&
150                     !(header->flags & CTDB_REC_RO_FLAGS))
151                 {
152                         header->rsn++;
153
154                         if (data.dsize == 0) {
155                                 schedule_for_deletion = true;
156                         }
157                 }
158                 remove_from_delete_queue = !schedule_for_deletion;
159         }
160
161 store:
162         /*
163          * The VACUUM_MIGRATED flag is only set temporarily for
164          * the above logic when the record was retrieved by a
165          * VACUUM_MIGRATE call and should not be stored in the
166          * database.
167          *
168          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
169          * and there are two cases in which the corresponding record
170          * is stored in the local database:
171          * 1. The record has been migrated with data in the past
172          *    (the MIGRATED_WITH_DATA record flag is set).
173          * 2. The record has been filled with data again since it
174          *    had been submitted in the VACUUM_FETCH message to the
175          *    lmaster.
176          * For such records it is important to not store the
177          * VACUUM_MIGRATED flag in the database.
178          */
179         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
180
181         /*
182          * Similarly, clear the AUTOMATIC flag which should not enter
183          * the local database copy since this would require client
184          * modifications to clear the flag when the client stores
185          * the record.
186          */
187         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
188
189         rec[0].dsize = hsize;
190         rec[0].dptr = (uint8_t *)header;
191
192         rec[1].dsize = data.dsize;
193         rec[1].dptr = data.dptr;
194
195         /* Databases with seqnum updates enabled only get their seqnum
196            changes when/if we modify the data */
197         if (ctdb_db->seqnum_update != NULL) {
198                 TDB_DATA old;
199                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
200
201                 if ((old.dsize == hsize + data.dsize) &&
202                     memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
203                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
204                         seqnum_suppressed = true;
205                 }
206                 if (old.dptr != NULL) {
207                         free(old.dptr);
208                 }
209         }
210
211         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
212                             ctdb_db->db_name,
213                             keep?"storing":"deleting",
214                             ctdb_hash(&key)));
215
216         if (keep) {
217                 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
218         } else {
219                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
220         }
221
222         if (ret != 0) {
223                 int lvl = DEBUG_ERR;
224
225                 if (keep == false &&
226                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
227                 {
228                         lvl = DEBUG_DEBUG;
229                 }
230
231                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
232                             "%d - %s\n",
233                             ctdb_db->db_name,
234                             keep?"store":"delete", ret,
235                             tdb_errorstr(ctdb_db->ltdb->tdb)));
236
237                 schedule_for_deletion = false;
238                 remove_from_delete_queue = false;
239         }
240         if (seqnum_suppressed) {
241                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
242         }
243
244         if (schedule_for_deletion) {
245                 int ret2;
246                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
247                 if (ret2 != 0) {
248                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
249                 }
250         }
251
252         if (remove_from_delete_queue) {
253                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
254         }
255
256         return ret;
257 }
258
259 struct lock_fetch_state {
260         struct ctdb_context *ctdb;
261         struct ctdb_db_context *ctdb_db;
262         void (*recv_pkt)(void *, struct ctdb_req_header *);
263         void *recv_context;
264         struct ctdb_req_header *hdr;
265         uint32_t generation;
266         bool ignore_generation;
267 };
268
269 /*
270   called when we should retry the operation
271  */
272 static void lock_fetch_callback(void *p, bool locked)
273 {
274         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
275         if (!state->ignore_generation &&
276             state->generation != state->ctdb_db->generation) {
277                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
278                 talloc_free(state->hdr);
279                 return;
280         }
281         state->recv_pkt(state->recv_context, state->hdr);
282         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
283 }
284
285
286 /*
287   do a non-blocking ltdb_lock, deferring this ctdb request until we
288   have the chainlock
289
290   It does the following:
291
292    1) tries to get the chainlock. If it succeeds, then it returns 0
293
294    2) if it fails to get a chainlock immediately then it sets up a
295    non-blocking chainlock via ctdb_lock_record, and when it gets the
296    chainlock it re-submits this ctdb request to the main packet
297    receive function.
298
299    This effectively queues all ctdb requests that cannot be
300    immediately satisfied until it can get the lock. This means that
301    the main ctdb daemon will not block waiting for a chainlock held by
302    a client
303
304    There are 3 possible return values:
305
306        0:    means that it got the lock immediately.
307       -1:    means that it failed to get the lock, and won't retry
308       -2:    means that it failed to get the lock immediately, but will retry
309  */
310 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
311                            TDB_DATA key, struct ctdb_req_header *hdr,
312                            void (*recv_pkt)(void *, struct ctdb_req_header *),
313                            void *recv_context, bool ignore_generation)
314 {
315         int ret;
316         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
317         struct lock_request *lreq;
318         struct lock_fetch_state *state;
319         
320         ret = tdb_chainlock_nonblock(tdb, key);
321
322         if (ret != 0 &&
323             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
324                 /* a hard failure - don't try again */
325                 return -1;
326         }
327
328         /* when torturing, ensure we test the contended path */
329         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
330             random() % 5 == 0) {
331                 ret = -1;
332                 tdb_chainunlock(tdb, key);
333         }
334
335         /* first the non-contended path */
336         if (ret == 0) {
337                 return 0;
338         }
339
340         state = talloc(hdr, struct lock_fetch_state);
341         state->ctdb = ctdb_db->ctdb;
342         state->ctdb_db = ctdb_db;
343         state->hdr = hdr;
344         state->recv_pkt = recv_pkt;
345         state->recv_context = recv_context;
346         state->generation = ctdb_db->generation;
347         state->ignore_generation = ignore_generation;
348
349         /* now the contended path */
350         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
351         if (lreq == NULL) {
352                 return -1;
353         }
354
355         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
356            so it won't be freed yet */
357         talloc_steal(state, hdr);
358
359         /* now tell the caller than we will retry asynchronously */
360         return -2;
361 }
362
363 /*
364   a varient of ctdb_ltdb_lock_requeue that also fetches the record
365  */
366 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
367                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
368                                  struct ctdb_req_header *hdr, TDB_DATA *data,
369                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
370                                  void *recv_context, bool ignore_generation)
371 {
372         int ret;
373
374         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
375                                      recv_context, ignore_generation);
376         if (ret == 0) {
377                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
378                 if (ret != 0) {
379                         int uret;
380                         uret = ctdb_ltdb_unlock(ctdb_db, key);
381                         if (uret != 0) {
382                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
383                         }
384                 }
385         }
386         return ret;
387 }
388
389
390 /*
391   paraoid check to see if the db is empty
392  */
393 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
394 {
395         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
396         int count = tdb_traverse_read(tdb, NULL, NULL);
397         if (count != 0) {
398                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
399                          ctdb_db->db_path));
400                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
401         }
402 }
403
404 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
405                                 struct ctdb_db_context *ctdb_db)
406 {
407         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
408         char *old;
409         char *reason = NULL;
410         TDB_DATA key;
411         TDB_DATA val;
412
413         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
414         key.dsize = strlen(ctdb_db->db_name);
415
416         old = ctdb_db->unhealthy_reason;
417         ctdb_db->unhealthy_reason = NULL;
418
419         val = tdb_fetch(tdb, key);
420         if (val.dsize > 0) {
421                 reason = talloc_strndup(ctdb_db,
422                                         (const char *)val.dptr,
423                                         val.dsize);
424                 if (reason == NULL) {
425                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
426                                            (int)val.dsize));
427                         ctdb_db->unhealthy_reason = old;
428                         free(val.dptr);
429                         return -1;
430                 }
431         }
432
433         if (val.dptr) {
434                 free(val.dptr);
435         }
436
437         talloc_free(old);
438         ctdb_db->unhealthy_reason = reason;
439         return 0;
440 }
441
442 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
443                                   struct ctdb_db_context *ctdb_db,
444                                   const char *given_reason,/* NULL means healthy */
445                                   int num_healthy_nodes)
446 {
447         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
448         int ret;
449         TDB_DATA key;
450         TDB_DATA val;
451         char *new_reason = NULL;
452         char *old_reason = NULL;
453
454         ret = tdb_transaction_start(tdb);
455         if (ret != 0) {
456                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
457                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
458                 return -1;
459         }
460
461         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
462         if (ret != 0) {
463                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
464                                    ctdb_db->db_name, ret));
465                 return -1;
466         }
467         old_reason = ctdb_db->unhealthy_reason;
468
469         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
470         key.dsize = strlen(ctdb_db->db_name);
471
472         if (given_reason) {
473                 new_reason = talloc_strdup(ctdb_db, given_reason);
474                 if (new_reason == NULL) {
475                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
476                                           given_reason));
477                         return -1;
478                 }
479         } else if (old_reason && num_healthy_nodes == 0) {
480                 /*
481                  * If the reason indicates ok, but there where no healthy nodes
482                  * available, that it means, we have not recovered valid content
483                  * of the db. So if there's an old reason, prefix it with
484                  * "NO-HEALTHY-NODES - "
485                  */
486                 const char *prefix;
487
488 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
489                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
490                 if (ret != 0) {
491                         prefix = _TMP_PREFIX;
492                 } else {
493                         prefix = "";
494                 }
495                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
496                                          prefix, old_reason);
497                 if (new_reason == NULL) {
498                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
499                                           prefix, old_reason));
500                         return -1;
501                 }
502 #undef _TMP_PREFIX
503         }
504
505         if (new_reason) {
506                 val.dptr = discard_const_p(uint8_t, new_reason);
507                 val.dsize = strlen(new_reason);
508
509                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
510                 if (ret != 0) {
511                         tdb_transaction_cancel(tdb);
512                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
513                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
514                                            ret, tdb_errorstr(tdb)));
515                         talloc_free(new_reason);
516                         return -1;
517                 }
518                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
519                                    ctdb_db->db_name, new_reason));
520         } else if (old_reason) {
521                 ret = tdb_delete(tdb, key);
522                 if (ret != 0) {
523                         tdb_transaction_cancel(tdb);
524                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
525                                            tdb_name(tdb), ctdb_db->db_name,
526                                            ret, tdb_errorstr(tdb)));
527                         talloc_free(new_reason);
528                         return -1;
529                 }
530                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
531                                    ctdb_db->db_name));
532         }
533
534         ret = tdb_transaction_commit(tdb);
535         if (ret != TDB_SUCCESS) {
536                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
537                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
538                 talloc_free(new_reason);
539                 return -1;
540         }
541
542         talloc_free(old_reason);
543         ctdb_db->unhealthy_reason = new_reason;
544
545         return 0;
546 }
547
548 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
549                                      struct ctdb_db_context *ctdb_db)
550 {
551         time_t now = time(NULL);
552         char *new_path;
553         char *new_reason;
554         int ret;
555         struct tm *tm;
556
557         tm = gmtime(&now);
558
559         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
560         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
561                                    "%04u%02u%02u%02u%02u%02u.0Z",
562                                    ctdb_db->db_path,
563                                    tm->tm_year+1900, tm->tm_mon+1,
564                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
565                                    tm->tm_sec);
566         if (new_path == NULL) {
567                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
568                 return -1;
569         }
570
571         new_reason = talloc_asprintf(ctdb_db,
572                                      "ERROR - Backup of corrupted TDB in '%s'",
573                                      new_path);
574         if (new_reason == NULL) {
575                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
576                 return -1;
577         }
578         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
579         talloc_free(new_reason);
580         if (ret != 0) {
581                 DEBUG(DEBUG_CRIT,(__location__
582                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
583                                  ctdb_db->db_path));
584                 return -1;
585         }
586
587         ret = rename(ctdb_db->db_path, new_path);
588         if (ret != 0) {
589                 DEBUG(DEBUG_CRIT,(__location__
590                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
591                                   ctdb_db->db_path, new_path,
592                                   errno, strerror(errno)));
593                 talloc_free(new_path);
594                 return -1;
595         }
596
597         DEBUG(DEBUG_CRIT,(__location__
598                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
599                          ctdb_db->db_path, new_path));
600         talloc_free(new_path);
601         return 0;
602 }
603
604 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
605 {
606         struct ctdb_db_context *ctdb_db;
607         int ret;
608         int ok = 0;
609         int fail = 0;
610
611         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
612                 if (!ctdb_db->persistent) {
613                         continue;
614                 }
615
616                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
617                 if (ret != 0) {
618                         DEBUG(DEBUG_ALERT,(__location__
619                                            " load persistent health for '%s' failed\n",
620                                            ctdb_db->db_path));
621                         return -1;
622                 }
623
624                 if (ctdb_db->unhealthy_reason == NULL) {
625                         ok++;
626                         DEBUG(DEBUG_INFO,(__location__
627                                    " persistent db '%s' healthy\n",
628                                    ctdb_db->db_path));
629                         continue;
630                 }
631
632                 fail++;
633                 DEBUG(DEBUG_ALERT,(__location__
634                                    " persistent db '%s' unhealthy: %s\n",
635                                    ctdb_db->db_path,
636                                    ctdb_db->unhealthy_reason));
637         }
638         DEBUG(DEBUG_NOTICE,
639               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
640                ok, fail));
641
642         if (fail != 0) {
643                 return -1;
644         }
645
646         return 0;
647 }
648
649
650 /*
651   mark a database - as healthy
652  */
653 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
654 {
655         uint32_t db_id = *(uint32_t *)indata.dptr;
656         struct ctdb_db_context *ctdb_db;
657         int ret;
658         bool may_recover = false;
659
660         ctdb_db = find_ctdb_db(ctdb, db_id);
661         if (!ctdb_db) {
662                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
663                 return -1;
664         }
665
666         if (ctdb_db->unhealthy_reason) {
667                 may_recover = true;
668         }
669
670         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
671         if (ret != 0) {
672                 DEBUG(DEBUG_ERR,(__location__
673                                  " ctdb_update_persistent_health(%s) failed\n",
674                                  ctdb_db->db_name));
675                 return -1;
676         }
677
678         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
679                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
680                                   ctdb_db->db_name));
681                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
682         }
683
684         return 0;
685 }
686
687 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
688                                    TDB_DATA indata,
689                                    TDB_DATA *outdata)
690 {
691         uint32_t db_id = *(uint32_t *)indata.dptr;
692         struct ctdb_db_context *ctdb_db;
693         int ret;
694
695         ctdb_db = find_ctdb_db(ctdb, db_id);
696         if (!ctdb_db) {
697                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
698                 return -1;
699         }
700
701         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
702         if (ret != 0) {
703                 DEBUG(DEBUG_ERR,(__location__
704                                  " ctdb_load_persistent_health(%s) failed\n",
705                                  ctdb_db->db_name));
706                 return -1;
707         }
708
709         *outdata = tdb_null;
710         if (ctdb_db->unhealthy_reason) {
711                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
712                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
713         }
714
715         return 0;
716 }
717
718
719 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
720 {
721         char *ropath;
722
723         if (ctdb_db->readonly) {
724                 return 0;
725         }
726
727         if (ctdb_db->persistent) {
728                 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
729                 return -1;
730         }
731
732         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
733         if (ropath == NULL) {
734                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
735                 return -1;
736         }
737         ctdb_db->rottdb = tdb_open(ropath, 
738                               ctdb->tunable.database_hash_size, 
739                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
740                               O_CREAT|O_RDWR, 0600);
741         if (ctdb_db->rottdb == NULL) {
742                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
743                 talloc_free(ropath);
744                 return -1;
745         }
746
747         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
748
749         ctdb_db->readonly = true;
750
751         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
752
753         talloc_free(ropath);
754         return 0;
755 }
756
757 /*
758   attach to a database, handling both persistent and non-persistent databases
759   return 0 on success, -1 on failure
760  */
761 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
762                              bool persistent, const char *unhealthy_reason,
763                              bool jenkinshash, bool mutexes)
764 {
765         struct ctdb_db_context *ctdb_db, *tmp_db;
766         int ret;
767         struct TDB_DATA key;
768         unsigned tdb_flags;
769         int mode = 0600;
770         int remaining_tries = 0;
771
772         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
773         CTDB_NO_MEMORY(ctdb, ctdb_db);
774
775         ctdb_db->ctdb = ctdb;
776         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
777         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
778
779         key.dsize = strlen(db_name)+1;
780         key.dptr  = discard_const(db_name);
781         ctdb_db->db_id = ctdb_hash(&key);
782         ctdb_db->persistent = persistent;
783
784         if (!ctdb_db->persistent) {
785                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
786                 if (ctdb_db->delete_queue == NULL) {
787                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
788                 }
789
790                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
791         }
792
793         /* check for hash collisions */
794         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
795                 if (tmp_db->db_id == ctdb_db->db_id) {
796                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
797                                  tmp_db->db_id, db_name, tmp_db->db_name));
798                         talloc_free(ctdb_db);
799                         return -1;
800                 }
801         }
802
803         if (persistent) {
804                 if (unhealthy_reason) {
805                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
806                                                             unhealthy_reason, 0);
807                         if (ret != 0) {
808                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
809                                                    ctdb_db->db_name, unhealthy_reason, ret));
810                                 talloc_free(ctdb_db);
811                                 return -1;
812                         }
813                 }
814
815                 if (ctdb->max_persistent_check_errors > 0) {
816                         remaining_tries = 1;
817                 }
818                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
819                         remaining_tries = 0;
820                 }
821
822                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
823                 if (ret != 0) {
824                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
825                                    ctdb_db->db_name, ret));
826                         talloc_free(ctdb_db);
827                         return -1;
828                 }
829         }
830
831         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
832                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
833                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
834                 talloc_free(ctdb_db);
835                 return -1;
836         }
837
838         if (ctdb_db->unhealthy_reason) {
839                 /* this is just a warning, but we want that in the log file! */
840                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
841                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
842         }
843
844         /* open the database */
845         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
846                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
847                                            db_name, ctdb->pnn);
848
849         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
850         if (ctdb->valgrinding) {
851                 tdb_flags |= TDB_NOMMAP;
852         }
853         tdb_flags |= TDB_DISALLOW_NESTING;
854         if (jenkinshash) {
855                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
856         }
857 #ifdef TDB_MUTEX_LOCKING
858         if (ctdb->tunable.mutex_enabled && mutexes &&
859             tdb_runtime_check_for_robust_mutexes()) {
860                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
861         }
862 #endif
863
864 again:
865         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
866                                       ctdb->tunable.database_hash_size, 
867                                       tdb_flags, 
868                                       O_CREAT|O_RDWR, mode);
869         if (ctdb_db->ltdb == NULL) {
870                 struct stat st;
871                 int saved_errno = errno;
872
873                 if (!persistent) {
874                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
875                                           ctdb_db->db_path,
876                                           saved_errno,
877                                           strerror(saved_errno)));
878                         talloc_free(ctdb_db);
879                         return -1;
880                 }
881
882                 if (remaining_tries == 0) {
883                         DEBUG(DEBUG_CRIT,(__location__
884                                           "Failed to open persistent tdb '%s': %d - %s\n",
885                                           ctdb_db->db_path,
886                                           saved_errno,
887                                           strerror(saved_errno)));
888                         talloc_free(ctdb_db);
889                         return -1;
890                 }
891
892                 ret = stat(ctdb_db->db_path, &st);
893                 if (ret != 0) {
894                         DEBUG(DEBUG_CRIT,(__location__
895                                           "Failed to open persistent tdb '%s': %d - %s\n",
896                                           ctdb_db->db_path,
897                                           saved_errno,
898                                           strerror(saved_errno)));
899                         talloc_free(ctdb_db);
900                         return -1;
901                 }
902
903                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
904                 if (ret != 0) {
905                         DEBUG(DEBUG_CRIT,(__location__
906                                           "Failed to open persistent tdb '%s': %d - %s\n",
907                                           ctdb_db->db_path,
908                                           saved_errno,
909                                           strerror(saved_errno)));
910                         talloc_free(ctdb_db);
911                         return -1;
912                 }
913
914                 remaining_tries--;
915                 mode = st.st_mode;
916                 goto again;
917         }
918
919         if (!persistent) {
920                 ctdb_check_db_empty(ctdb_db);
921         } else {
922                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
923                 if (ret != 0) {
924                         int fd;
925                         struct stat st;
926
927                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
928                                           ctdb_db->db_path, ret,
929                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
930                         if (remaining_tries == 0) {
931                                 talloc_free(ctdb_db);
932                                 return -1;
933                         }
934
935                         fd = tdb_fd(ctdb_db->ltdb->tdb);
936                         ret = fstat(fd, &st);
937                         if (ret != 0) {
938                                 DEBUG(DEBUG_CRIT,(__location__
939                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
940                                                   ctdb_db->db_path,
941                                                   errno,
942                                                   strerror(errno)));
943                                 talloc_free(ctdb_db);
944                                 return -1;
945                         }
946
947                         /* close the TDB */
948                         talloc_free(ctdb_db->ltdb);
949                         ctdb_db->ltdb = NULL;
950
951                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
952                         if (ret != 0) {
953                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
954                                                   ctdb_db->db_path));
955                                 talloc_free(ctdb_db);
956                                 return -1;
957                         }
958
959                         remaining_tries--;
960                         mode = st.st_mode;
961                         goto again;
962                 }
963         }
964
965         /* set up a rb tree we can use to track which records we have a 
966            fetch-lock in-flight for so we can defer any additional calls
967            for the same record.
968          */
969         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
970         if (ctdb_db->deferred_fetch == NULL) {
971                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
972                 talloc_free(ctdb_db);
973                 return -1;
974         }
975
976         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
977         if (ctdb_db->defer_dmaster == NULL) {
978                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
979                                   ctdb_db->db_name));
980                 talloc_free(ctdb_db);
981                 return -1;
982         }
983
984         DLIST_ADD(ctdb->db_list, ctdb_db);
985
986         /* setting this can help some high churn databases */
987         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
988
989         /* 
990            all databases support the "null" function. we need this in
991            order to do forced migration of records
992         */
993         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
994         if (ret != 0) {
995                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
996                 talloc_free(ctdb_db);
997                 return -1;
998         }
999
1000         /* 
1001            all databases support the "fetch" function. we need this
1002            for efficient Samba3 ctdb fetch
1003         */
1004         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1005         if (ret != 0) {
1006                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1007                 talloc_free(ctdb_db);
1008                 return -1;
1009         }
1010
1011         /* 
1012            all databases support the "fetch_with_header" function. we need this
1013            for efficient readonly record fetches
1014         */
1015         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1016         if (ret != 0) {
1017                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1018                 talloc_free(ctdb_db);
1019                 return -1;
1020         }
1021
1022         ret = ctdb_vacuum_init(ctdb_db);
1023         if (ret != 0) {
1024                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1025                                   "database '%s'\n", ctdb_db->db_name));
1026                 talloc_free(ctdb_db);
1027                 return -1;
1028         }
1029
1030         ret = ctdb_migration_init(ctdb_db);
1031         if (ret != 0) {
1032                 DEBUG(DEBUG_ERR,
1033                       ("Failed to setup migration tracking for db '%s'\n",
1034                        ctdb_db->db_name));
1035                 talloc_free(ctdb_db);
1036                 return -1;
1037         }
1038
1039         ctdb_db->generation = ctdb->vnn_map->generation;
1040
1041         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1042                             ctdb_db->db_path, tdb_flags));
1043
1044         /* success */
1045         return 0;
1046 }
1047
1048
1049 struct ctdb_deferred_attach_context {
1050         struct ctdb_deferred_attach_context *next, *prev;
1051         struct ctdb_context *ctdb;
1052         struct ctdb_req_control_old *c;
1053 };
1054
1055
1056 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1057 {
1058         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1059
1060         return 0;
1061 }
1062
1063 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1064                                          struct tevent_timer *te,
1065                                          struct timeval t, void *private_data)
1066 {
1067         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1068         struct ctdb_context *ctdb = da_ctx->ctdb;
1069
1070         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1071         talloc_free(da_ctx);
1072 }
1073
1074 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1075                                           struct tevent_timer *te,
1076                                           struct timeval t, void *private_data)
1077 {
1078         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1079         struct ctdb_context *ctdb = da_ctx->ctdb;
1080
1081         /* This talloc-steals the packet ->c */
1082         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1083         talloc_free(da_ctx);
1084 }
1085
1086 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1087 {
1088         struct ctdb_deferred_attach_context *da_ctx;
1089
1090         /* call it from the main event loop as soon as the current event 
1091            finishes.
1092          */
1093         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1094                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1095                 tevent_add_timer(ctdb->ev, da_ctx,
1096                                  timeval_current_ofs(1,0),
1097                                  ctdb_deferred_attach_callback, da_ctx);
1098         }
1099
1100         return 0;
1101 }
1102
1103 /*
1104   a client has asked to attach a new database
1105  */
1106 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1107                                TDB_DATA *outdata, uint64_t tdb_flags, 
1108                                bool persistent, uint32_t client_id,
1109                                struct ctdb_req_control_old *c,
1110                                bool *async_reply)
1111 {
1112         const char *db_name = (const char *)indata.dptr;
1113         struct ctdb_db_context *db;
1114         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1115         struct ctdb_client *client = NULL;
1116         bool with_jenkinshash, with_mutexes;
1117
1118         if (ctdb->tunable.allow_client_db_attach == 0) {
1119                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1120                                   "AllowClientDBAccess == 0\n", db_name));
1121                 return -1;
1122         }
1123
1124         /* don't allow any local clients to attach while we are in recovery mode
1125          * except for the recovery daemon.
1126          * allow all attach from the network since these are always from remote
1127          * recovery daemons.
1128          */
1129         if (client_id != 0) {
1130                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1131         }
1132         if (client != NULL) {
1133                 /* If the node is inactive it is not part of the cluster
1134                    and we should not allow clients to attach to any
1135                    databases
1136                 */
1137                 if (node->flags & NODE_FLAGS_INACTIVE) {
1138                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1139                         return -1;
1140                 }
1141
1142                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1143                     client->pid != ctdb->recoverd_pid &&
1144                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1145                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1146
1147                         if (da_ctx == NULL) {
1148                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1149                                 return -1;
1150                         }
1151
1152                         da_ctx->ctdb = ctdb;
1153                         da_ctx->c = talloc_steal(da_ctx, c);
1154                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1155                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1156
1157                         tevent_add_timer(ctdb->ev, da_ctx,
1158                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1159                                          ctdb_deferred_attach_timeout, da_ctx);
1160
1161                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1162                         *async_reply = true;
1163                         return 0;
1164                 }
1165         }
1166
1167         /* the client can optionally pass additional tdb flags, but we
1168            only allow a subset of those on the database in ctdb. Note
1169            that tdb_flags is passed in via the (otherwise unused)
1170            srvid to the attach control */
1171 #ifdef TDB_MUTEX_LOCKING
1172         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1173 #else
1174         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1175 #endif
1176
1177         /* see if we already have this name */
1178         db = ctdb_db_handle(ctdb, db_name);
1179         if (db) {
1180                 if (db->persistent != persistent) {
1181                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1182                                           "database %s\n", persistent ? "" : "non-",
1183                                           db-> persistent ? "" : "non-", db_name));
1184                         return -1;
1185                 }
1186                 outdata->dptr  = (uint8_t *)&db->db_id;
1187                 outdata->dsize = sizeof(db->db_id);
1188                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1189                 return 0;
1190         }
1191
1192         with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1193 #ifdef TDB_MUTEX_LOCKING
1194         with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1195 #else
1196         with_mutexes = false;
1197 #endif
1198
1199         if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1200                               with_jenkinshash, with_mutexes) != 0) {
1201                 return -1;
1202         }
1203
1204         db = ctdb_db_handle(ctdb, db_name);
1205         if (!db) {
1206                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1207                 return -1;
1208         }
1209
1210         /* remember the flags the client has specified */
1211         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1212
1213         outdata->dptr  = (uint8_t *)&db->db_id;
1214         outdata->dsize = sizeof(db->db_id);
1215
1216         /* Try to ensure it's locked in mem */
1217         lockdown_memory(ctdb->valgrinding);
1218
1219         /* tell all the other nodes about this database */
1220         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1221                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1222                                                 CTDB_CONTROL_DB_ATTACH,
1223                                  0, CTDB_CTRL_FLAG_NOREPLY,
1224                                  indata, NULL, NULL);
1225
1226         /* success */
1227         return 0;
1228 }
1229
1230 /*
1231  * a client has asked to detach from a database
1232  */
1233 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1234                                uint32_t client_id)
1235 {
1236         uint32_t db_id;
1237         struct ctdb_db_context *ctdb_db;
1238         struct ctdb_client *client = NULL;
1239
1240         db_id = *(uint32_t *)indata.dptr;
1241         ctdb_db = find_ctdb_db(ctdb, db_id);
1242         if (ctdb_db == NULL) {
1243                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1244                                   db_id));
1245                 return -1;
1246         }
1247
1248         if (ctdb->tunable.allow_client_db_attach == 1) {
1249                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1250                                   "Clients are allowed access to databases "
1251                                   "(AllowClientDBAccess == 1)\n",
1252                                   ctdb_db->db_name));
1253                 return -1;
1254         }
1255
1256         if (ctdb_db->persistent) {
1257                 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1258                                   "denied\n", ctdb_db->db_name));
1259                 return -1;
1260         }
1261
1262         /* Cannot detach from database when in recovery */
1263         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1264                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1265                 return -1;
1266         }
1267
1268         /* If a control comes from a client, then broadcast it to all nodes.
1269          * Do the actual detach only if the control comes from other daemons.
1270          */
1271         if (client_id != 0) {
1272                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1273                 if (client != NULL) {
1274                         /* forward the control to all the nodes */
1275                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1276                                                  CTDB_CONTROL_DB_DETACH, 0,
1277                                                  CTDB_CTRL_FLAG_NOREPLY,
1278                                                  indata, NULL, NULL);
1279                         return 0;
1280                 }
1281                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1282                                   "for database '%s'\n", ctdb_db->db_name));
1283                 return -1;
1284         }
1285
1286         /* Detach database from recoverd */
1287         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1288                                      CTDB_SRVID_DETACH_DATABASE,
1289                                      indata) != 0) {
1290                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1291                 return -1;
1292         }
1293
1294         /* Disable vacuuming and drop all vacuuming data */
1295         talloc_free(ctdb_db->vacuum_handle);
1296         talloc_free(ctdb_db->delete_queue);
1297
1298         /* Terminate any deferred fetch */
1299         talloc_free(ctdb_db->deferred_fetch);
1300
1301         /* Terminate any traverses */
1302         while (ctdb_db->traverse) {
1303                 talloc_free(ctdb_db->traverse);
1304         }
1305
1306         /* Terminate any revokes */
1307         while (ctdb_db->revokechild_active) {
1308                 talloc_free(ctdb_db->revokechild_active);
1309         }
1310
1311         /* Free readonly tracking database */
1312         if (ctdb_db->readonly) {
1313                 talloc_free(ctdb_db->rottdb);
1314         }
1315
1316         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1317
1318         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1319                              ctdb_db->db_name));
1320         talloc_free(ctdb_db);
1321
1322         return 0;
1323 }
1324
1325 /*
1326   attach to all existing persistent databases
1327  */
1328 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1329                                   const char *unhealthy_reason)
1330 {
1331         DIR *d;
1332         struct dirent *de;
1333
1334         /* open the persistent db directory and scan it for files */
1335         d = opendir(ctdb->db_directory_persistent);
1336         if (d == NULL) {
1337                 return 0;
1338         }
1339
1340         while ((de=readdir(d))) {
1341                 char *p, *s, *q;
1342                 size_t len = strlen(de->d_name);
1343                 uint32_t node;
1344                 int invalid_name = 0;
1345                 
1346                 s = talloc_strdup(ctdb, de->d_name);
1347                 if (s == NULL) {
1348                         closedir(d);
1349                         CTDB_NO_MEMORY(ctdb, s);
1350                 }
1351
1352                 /* only accept names ending in .tdb */
1353                 p = strstr(s, ".tdb.");
1354                 if (len < 7 || p == NULL) {
1355                         talloc_free(s);
1356                         continue;
1357                 }
1358
1359                 /* only accept names ending with .tdb. and any number of digits */
1360                 q = p+5;
1361                 while (*q != 0 && invalid_name == 0) {
1362                         if (!isdigit(*q++)) {
1363                                 invalid_name = 1;
1364                         }
1365                 }
1366                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1367                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1368                         talloc_free(s);
1369                         continue;
1370                 }
1371                 p[4] = 0;
1372
1373                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1374                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1375                         closedir(d);
1376                         talloc_free(s);
1377                         return -1;
1378                 }
1379
1380                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1381
1382                 talloc_free(s);
1383         }
1384         closedir(d);
1385         return 0;
1386 }
1387
1388 int ctdb_attach_databases(struct ctdb_context *ctdb)
1389 {
1390         int ret;
1391         char *persistent_health_path = NULL;
1392         char *unhealthy_reason = NULL;
1393         bool first_try = true;
1394
1395         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1396                                                  ctdb->db_directory_state,
1397                                                  PERSISTENT_HEALTH_TDB,
1398                                                  ctdb->pnn);
1399         if (persistent_health_path == NULL) {
1400                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1401                 return -1;
1402         }
1403
1404 again:
1405
1406         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1407                                                    0, TDB_DISALLOW_NESTING,
1408                                                    O_CREAT | O_RDWR, 0600);
1409         if (ctdb->db_persistent_health == NULL) {
1410                 struct tdb_wrap *tdb;
1411
1412                 if (!first_try) {
1413                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1414                                           persistent_health_path,
1415                                           errno,
1416                                           strerror(errno)));
1417                         talloc_free(persistent_health_path);
1418                         talloc_free(unhealthy_reason);
1419                         return -1;
1420                 }
1421                 first_try = false;
1422
1423                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1424                                                    persistent_health_path,
1425                                                    "was cleared after a failure",
1426                                                    "manual verification needed");
1427                 if (unhealthy_reason == NULL) {
1428                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1429                         talloc_free(persistent_health_path);
1430                         return -1;
1431                 }
1432
1433                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1434                                   persistent_health_path));
1435                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1436                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1437                                     O_CREAT | O_RDWR, 0600);
1438                 if (tdb) {
1439                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1440                                           persistent_health_path,
1441                                           errno,
1442                                           strerror(errno)));
1443                         talloc_free(persistent_health_path);
1444                         talloc_free(unhealthy_reason);
1445                         return -1;
1446                 }
1447
1448                 talloc_free(tdb);
1449                 goto again;
1450         }
1451         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1452         if (ret != 0) {
1453                 struct tdb_wrap *tdb;
1454
1455                 talloc_free(ctdb->db_persistent_health);
1456                 ctdb->db_persistent_health = NULL;
1457
1458                 if (!first_try) {
1459                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1460                                           persistent_health_path));
1461                         talloc_free(persistent_health_path);
1462                         talloc_free(unhealthy_reason);
1463                         return -1;
1464                 }
1465                 first_try = false;
1466
1467                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1468                                                    persistent_health_path,
1469                                                    "was cleared after a failure",
1470                                                    "manual verification needed");
1471                 if (unhealthy_reason == NULL) {
1472                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1473                         talloc_free(persistent_health_path);
1474                         return -1;
1475                 }
1476
1477                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1478                                   persistent_health_path));
1479                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1480                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1481                                     O_CREAT | O_RDWR, 0600);
1482                 if (tdb) {
1483                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1484                                           persistent_health_path,
1485                                           errno,
1486                                           strerror(errno)));
1487                         talloc_free(persistent_health_path);
1488                         talloc_free(unhealthy_reason);
1489                         return -1;
1490                 }
1491
1492                 talloc_free(tdb);
1493                 goto again;
1494         }
1495         talloc_free(persistent_health_path);
1496
1497         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1498         talloc_free(unhealthy_reason);
1499         if (ret != 0) {
1500                 return ret;
1501         }
1502
1503         return 0;
1504 }
1505
1506 /*
1507   called when a broadcast seqnum update comes in
1508  */
1509 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1510 {
1511         struct ctdb_db_context *ctdb_db;
1512         if (srcnode == ctdb->pnn) {
1513                 /* don't update ourselves! */
1514                 return 0;
1515         }
1516
1517         ctdb_db = find_ctdb_db(ctdb, db_id);
1518         if (!ctdb_db) {
1519                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1520                 return -1;
1521         }
1522
1523         if (ctdb_db->unhealthy_reason) {
1524                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1525                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1526                 return -1;
1527         }
1528
1529         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1530         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1531         return 0;
1532 }
1533
1534 /*
1535   timer to check for seqnum changes in a ltdb and propogate them
1536  */
1537 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1538                                    struct tevent_timer *te,
1539                                    struct timeval t, void *p)
1540 {
1541         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1542         struct ctdb_context *ctdb = ctdb_db->ctdb;
1543         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1544         if (new_seqnum != ctdb_db->seqnum) {
1545                 /* something has changed - propogate it */
1546                 TDB_DATA data;
1547                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1548                 data.dsize = sizeof(uint32_t);
1549                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1550                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1551                                          data, NULL, NULL);             
1552         }
1553         ctdb_db->seqnum = new_seqnum;
1554
1555         /* setup a new timer */
1556         ctdb_db->seqnum_update =
1557                 tevent_add_timer(ctdb->ev, ctdb_db,
1558                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1559                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1560                                  ctdb_ltdb_seqnum_check, ctdb_db);
1561 }
1562
1563 /*
1564   enable seqnum handling on this db
1565  */
1566 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1567 {
1568         struct ctdb_db_context *ctdb_db;
1569         ctdb_db = find_ctdb_db(ctdb, db_id);
1570         if (!ctdb_db) {
1571                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1572                 return -1;
1573         }
1574
1575         if (ctdb_db->seqnum_update == NULL) {
1576                 ctdb_db->seqnum_update = tevent_add_timer(
1577                         ctdb->ev, ctdb_db,
1578                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1579                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1580                         ctdb_ltdb_seqnum_check, ctdb_db);
1581         }
1582
1583         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1584         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1585         return 0;
1586 }
1587
1588 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1589 {
1590         if (ctdb_db->sticky) {
1591                 return 0;
1592         }
1593
1594         if (ctdb_db->persistent) {
1595                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1596                 return -1;
1597         }
1598
1599         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1600
1601         ctdb_db->sticky = true;
1602
1603         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1604
1605         return 0;
1606 }
1607
1608 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1609 {
1610         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1611         int i;
1612
1613         for (i=0; i<MAX_HOT_KEYS; i++) {
1614                 if (s->hot_keys[i].key.dsize > 0) {
1615                         talloc_free(s->hot_keys[i].key.dptr);
1616                 }
1617         }
1618
1619         ZERO_STRUCT(ctdb_db->statistics);
1620 }
1621
1622 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1623                                 uint32_t db_id,
1624                                 TDB_DATA *outdata)
1625 {
1626         struct ctdb_db_context *ctdb_db;
1627         struct ctdb_db_statistics_old *stats;
1628         int i;
1629         int len;
1630         char *ptr;
1631
1632         ctdb_db = find_ctdb_db(ctdb, db_id);
1633         if (!ctdb_db) {
1634                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1635                 return -1;
1636         }
1637
1638         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1639         for (i = 0; i < MAX_HOT_KEYS; i++) {
1640                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1641         }
1642
1643         stats = talloc_size(outdata, len);
1644         if (stats == NULL) {
1645                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1646                 return -1;
1647         }
1648
1649         memcpy(stats, &ctdb_db->statistics,
1650                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1651
1652         stats->num_hot_keys = MAX_HOT_KEYS;
1653
1654         ptr = &stats->hot_keys_wire[0];
1655         for (i = 0; i < MAX_HOT_KEYS; i++) {
1656                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1657                        ctdb_db->statistics.hot_keys[i].key.dsize);
1658                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1659         }
1660
1661         outdata->dptr  = (uint8_t *)stats;
1662         outdata->dsize = len;
1663
1664         return 0;
1665 }