ctdb-daemon: Simplify code using tdb_storev
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
45
46 /**
47  * write a record to a normal database
48  *
49  * This is the server-variant of the ctdb_ltdb_store function.
50  * It contains logic to determine whether a record should be
51  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52  * controls to the local ctdb daemon if apporpriate.
53  */
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55                                   TDB_DATA key,
56                                   struct ctdb_ltdb_header *header,
57                                   TDB_DATA data)
58 {
59         struct ctdb_context *ctdb = ctdb_db->ctdb;
60         TDB_DATA rec[2];
61         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
62         int ret;
63         bool seqnum_suppressed = false;
64         bool keep = false;
65         bool schedule_for_deletion = false;
66         bool remove_from_delete_queue = false;
67         uint32_t lmaster;
68
69         if (ctdb->flags & CTDB_FLAG_TORTURE) {
70                 TDB_DATA old;
71                 struct ctdb_ltdb_header *h2;
72
73                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74                 h2 = (struct ctdb_ltdb_header *)old.dptr;
75                 if (old.dptr != NULL &&
76                     old.dsize >= hsize &&
77                     h2->rsn > header->rsn) {
78                         DEBUG(DEBUG_ERR,
79                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
80                                h2->rsn, header->rsn));
81                 }
82                 if (old.dptr) {
83                         free(old.dptr);
84                 }
85         }
86
87         if (ctdb->vnn_map == NULL) {
88                 /*
89                  * Called from a client: always store the record
90                  * Also don't call ctdb_lmaster since it uses the vnn_map!
91                  */
92                 keep = true;
93                 goto store;
94         }
95
96         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
97
98         /*
99          * If we migrate an empty record off to another node
100          * and the record has not been migrated with data,
101          * delete the record instead of storing the empty record.
102          */
103         if (data.dsize != 0) {
104                 keep = true;
105         } else if (header->flags & CTDB_REC_RO_FLAGS) {
106                 keep = true;
107         } else if (ctdb_db->persistent) {
108                 keep = true;
109         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
110                 /*
111                  * The record is not created by the client but
112                  * automatically by the ctdb_ltdb_fetch logic that
113                  * creates a record with an initial header in the
114                  * ltdb before trying to migrate the record from
115                  * the current lmaster. Keep it instead of trying
116                  * to delete the non-existing record...
117                  */
118                 keep = true;
119                 schedule_for_deletion = true;
120         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
121                 keep = true;
122         } else if (ctdb_db->ctdb->pnn == lmaster) {
123                 /*
124                  * If we are lmaster, then we usually keep the record.
125                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
126                  * and the record is empty and has never been migrated
127                  * with data, then we should delete it instead of storing it.
128                  * This is part of the vacuuming process.
129                  *
130                  * The reason that we usually need to store even empty records
131                  * on the lmaster is that a client operating directly on the
132                  * lmaster (== dmaster) expects the local copy of the record to
133                  * exist after successful ctdb migrate call. If the record does
134                  * not exist, the client goes into a migrate loop and eventually
135                  * fails. So storing the empty record makes sure that we do not
136                  * need to change the client code.
137                  */
138                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
139                         keep = true;
140                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
141                         keep = true;
142                 }
143         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
144                 keep = true;
145         }
146
147         if (keep) {
148                 if (!ctdb_db->persistent &&
149                     (ctdb_db->ctdb->pnn == header->dmaster) &&
150                     !(header->flags & CTDB_REC_RO_FLAGS))
151                 {
152                         header->rsn++;
153
154                         if (data.dsize == 0) {
155                                 schedule_for_deletion = true;
156                         }
157                 }
158                 remove_from_delete_queue = !schedule_for_deletion;
159         }
160
161 store:
162         /*
163          * The VACUUM_MIGRATED flag is only set temporarily for
164          * the above logic when the record was retrieved by a
165          * VACUUM_MIGRATE call and should not be stored in the
166          * database.
167          *
168          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
169          * and there are two cases in which the corresponding record
170          * is stored in the local database:
171          * 1. The record has been migrated with data in the past
172          *    (the MIGRATED_WITH_DATA record flag is set).
173          * 2. The record has been filled with data again since it
174          *    had been submitted in the VACUUM_FETCH message to the
175          *    lmaster.
176          * For such records it is important to not store the
177          * VACUUM_MIGRATED flag in the database.
178          */
179         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
180
181         /*
182          * Similarly, clear the AUTOMATIC flag which should not enter
183          * the local database copy since this would require client
184          * modifications to clear the flag when the client stores
185          * the record.
186          */
187         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
188
189         rec[0].dsize = hsize;
190         rec[0].dptr = (uint8_t *)header;
191
192         rec[1].dsize = data.dsize;
193         rec[1].dptr = data.dptr;
194
195         /* Databases with seqnum updates enabled only get their seqnum
196            changes when/if we modify the data */
197         if (ctdb_db->seqnum_update != NULL) {
198                 TDB_DATA old;
199                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
200
201                 if ((old.dsize == hsize + data.dsize) &&
202                     memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
203                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
204                         seqnum_suppressed = true;
205                 }
206                 if (old.dptr != NULL) {
207                         free(old.dptr);
208                 }
209         }
210
211         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
212                             ctdb_db->db_name,
213                             keep?"storing":"deleting",
214                             ctdb_hash(&key)));
215
216         if (keep) {
217                 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
218         } else {
219                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
220         }
221
222         if (ret != 0) {
223                 int lvl = DEBUG_ERR;
224
225                 if (keep == false &&
226                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
227                 {
228                         lvl = DEBUG_DEBUG;
229                 }
230
231                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
232                             "%d - %s\n",
233                             ctdb_db->db_name,
234                             keep?"store":"delete", ret,
235                             tdb_errorstr(ctdb_db->ltdb->tdb)));
236
237                 schedule_for_deletion = false;
238                 remove_from_delete_queue = false;
239         }
240         if (seqnum_suppressed) {
241                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
242         }
243
244         if (schedule_for_deletion) {
245                 int ret2;
246                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
247                 if (ret2 != 0) {
248                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
249                 }
250         }
251
252         if (remove_from_delete_queue) {
253                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
254         }
255
256         return ret;
257 }
258
259 struct lock_fetch_state {
260         struct ctdb_context *ctdb;
261         struct ctdb_db_context *ctdb_db;
262         void (*recv_pkt)(void *, struct ctdb_req_header *);
263         void *recv_context;
264         struct ctdb_req_header *hdr;
265         uint32_t generation;
266         bool ignore_generation;
267 };
268
269 /*
270   called when we should retry the operation
271  */
272 static void lock_fetch_callback(void *p, bool locked)
273 {
274         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
275         if (!state->ignore_generation &&
276             state->generation != state->ctdb_db->generation) {
277                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
278                 talloc_free(state->hdr);
279                 return;
280         }
281         state->recv_pkt(state->recv_context, state->hdr);
282         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
283 }
284
285
286 /*
287   do a non-blocking ltdb_lock, deferring this ctdb request until we
288   have the chainlock
289
290   It does the following:
291
292    1) tries to get the chainlock. If it succeeds, then it returns 0
293
294    2) if it fails to get a chainlock immediately then it sets up a
295    non-blocking chainlock via ctdb_lock_record, and when it gets the
296    chainlock it re-submits this ctdb request to the main packet
297    receive function.
298
299    This effectively queues all ctdb requests that cannot be
300    immediately satisfied until it can get the lock. This means that
301    the main ctdb daemon will not block waiting for a chainlock held by
302    a client
303
304    There are 3 possible return values:
305
306        0:    means that it got the lock immediately.
307       -1:    means that it failed to get the lock, and won't retry
308       -2:    means that it failed to get the lock immediately, but will retry
309  */
310 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
311                            TDB_DATA key, struct ctdb_req_header *hdr,
312                            void (*recv_pkt)(void *, struct ctdb_req_header *),
313                            void *recv_context, bool ignore_generation)
314 {
315         int ret;
316         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
317         struct lock_request *lreq;
318         struct lock_fetch_state *state;
319         
320         ret = tdb_chainlock_nonblock(tdb, key);
321
322         if (ret != 0 &&
323             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
324                 /* a hard failure - don't try again */
325                 return -1;
326         }
327
328         /* when torturing, ensure we test the contended path */
329         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
330             random() % 5 == 0) {
331                 ret = -1;
332                 tdb_chainunlock(tdb, key);
333         }
334
335         /* first the non-contended path */
336         if (ret == 0) {
337                 return 0;
338         }
339
340         state = talloc(hdr, struct lock_fetch_state);
341         state->ctdb = ctdb_db->ctdb;
342         state->ctdb_db = ctdb_db;
343         state->hdr = hdr;
344         state->recv_pkt = recv_pkt;
345         state->recv_context = recv_context;
346         state->generation = ctdb_db->generation;
347         state->ignore_generation = ignore_generation;
348
349         /* now the contended path */
350         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
351         if (lreq == NULL) {
352                 return -1;
353         }
354
355         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
356            so it won't be freed yet */
357         talloc_steal(state, hdr);
358
359         /* now tell the caller than we will retry asynchronously */
360         return -2;
361 }
362
363 /*
364   a varient of ctdb_ltdb_lock_requeue that also fetches the record
365  */
366 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
367                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
368                                  struct ctdb_req_header *hdr, TDB_DATA *data,
369                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
370                                  void *recv_context, bool ignore_generation)
371 {
372         int ret;
373
374         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
375                                      recv_context, ignore_generation);
376         if (ret == 0) {
377                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
378                 if (ret != 0) {
379                         int uret;
380                         uret = ctdb_ltdb_unlock(ctdb_db, key);
381                         if (uret != 0) {
382                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
383                         }
384                 }
385         }
386         return ret;
387 }
388
389
390 /*
391   paraoid check to see if the db is empty
392  */
393 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
394 {
395         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
396         int count = tdb_traverse_read(tdb, NULL, NULL);
397         if (count != 0) {
398                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
399                          ctdb_db->db_path));
400                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
401         }
402 }
403
404 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
405                                 struct ctdb_db_context *ctdb_db)
406 {
407         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
408         char *old;
409         char *reason = NULL;
410         TDB_DATA key;
411         TDB_DATA val;
412
413         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
414         key.dsize = strlen(ctdb_db->db_name);
415
416         old = ctdb_db->unhealthy_reason;
417         ctdb_db->unhealthy_reason = NULL;
418
419         val = tdb_fetch(tdb, key);
420         if (val.dsize > 0) {
421                 reason = talloc_strndup(ctdb_db,
422                                         (const char *)val.dptr,
423                                         val.dsize);
424                 if (reason == NULL) {
425                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
426                                            (int)val.dsize));
427                         ctdb_db->unhealthy_reason = old;
428                         free(val.dptr);
429                         return -1;
430                 }
431         }
432
433         if (val.dptr) {
434                 free(val.dptr);
435         }
436
437         talloc_free(old);
438         ctdb_db->unhealthy_reason = reason;
439         return 0;
440 }
441
442 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
443                                   struct ctdb_db_context *ctdb_db,
444                                   const char *given_reason,/* NULL means healthy */
445                                   int num_healthy_nodes)
446 {
447         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
448         int ret;
449         TDB_DATA key;
450         TDB_DATA val;
451         char *new_reason = NULL;
452         char *old_reason = NULL;
453
454         ret = tdb_transaction_start(tdb);
455         if (ret != 0) {
456                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
457                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
458                 return -1;
459         }
460
461         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
462         if (ret != 0) {
463                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
464                                    ctdb_db->db_name, ret));
465                 return -1;
466         }
467         old_reason = ctdb_db->unhealthy_reason;
468
469         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
470         key.dsize = strlen(ctdb_db->db_name);
471
472         if (given_reason) {
473                 new_reason = talloc_strdup(ctdb_db, given_reason);
474                 if (new_reason == NULL) {
475                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
476                                           given_reason));
477                         return -1;
478                 }
479         } else if (old_reason && num_healthy_nodes == 0) {
480                 /*
481                  * If the reason indicates ok, but there where no healthy nodes
482                  * available, that it means, we have not recovered valid content
483                  * of the db. So if there's an old reason, prefix it with
484                  * "NO-HEALTHY-NODES - "
485                  */
486                 const char *prefix;
487
488 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
489                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
490                 if (ret != 0) {
491                         prefix = _TMP_PREFIX;
492                 } else {
493                         prefix = "";
494                 }
495                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
496                                          prefix, old_reason);
497                 if (new_reason == NULL) {
498                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
499                                           prefix, old_reason));
500                         return -1;
501                 }
502 #undef _TMP_PREFIX
503         }
504
505         if (new_reason) {
506                 val.dptr = discard_const_p(uint8_t, new_reason);
507                 val.dsize = strlen(new_reason);
508
509                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
510                 if (ret != 0) {
511                         tdb_transaction_cancel(tdb);
512                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
513                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
514                                            ret, tdb_errorstr(tdb)));
515                         talloc_free(new_reason);
516                         return -1;
517                 }
518                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
519                                    ctdb_db->db_name, new_reason));
520         } else if (old_reason) {
521                 ret = tdb_delete(tdb, key);
522                 if (ret != 0) {
523                         tdb_transaction_cancel(tdb);
524                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
525                                            tdb_name(tdb), ctdb_db->db_name,
526                                            ret, tdb_errorstr(tdb)));
527                         talloc_free(new_reason);
528                         return -1;
529                 }
530                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
531                                    ctdb_db->db_name));
532         }
533
534         ret = tdb_transaction_commit(tdb);
535         if (ret != TDB_SUCCESS) {
536                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
537                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
538                 talloc_free(new_reason);
539                 return -1;
540         }
541
542         talloc_free(old_reason);
543         ctdb_db->unhealthy_reason = new_reason;
544
545         return 0;
546 }
547
548 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
549                                      struct ctdb_db_context *ctdb_db)
550 {
551         time_t now = time(NULL);
552         char *new_path;
553         char *new_reason;
554         int ret;
555         struct tm *tm;
556
557         tm = gmtime(&now);
558
559         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
560         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
561                                    "%04u%02u%02u%02u%02u%02u.0Z",
562                                    ctdb_db->db_path,
563                                    tm->tm_year+1900, tm->tm_mon+1,
564                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
565                                    tm->tm_sec);
566         if (new_path == NULL) {
567                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
568                 return -1;
569         }
570
571         new_reason = talloc_asprintf(ctdb_db,
572                                      "ERROR - Backup of corrupted TDB in '%s'",
573                                      new_path);
574         if (new_reason == NULL) {
575                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
576                 return -1;
577         }
578         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
579         talloc_free(new_reason);
580         if (ret != 0) {
581                 DEBUG(DEBUG_CRIT,(__location__
582                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
583                                  ctdb_db->db_path));
584                 return -1;
585         }
586
587         ret = rename(ctdb_db->db_path, new_path);
588         if (ret != 0) {
589                 DEBUG(DEBUG_CRIT,(__location__
590                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
591                                   ctdb_db->db_path, new_path,
592                                   errno, strerror(errno)));
593                 talloc_free(new_path);
594                 return -1;
595         }
596
597         DEBUG(DEBUG_CRIT,(__location__
598                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
599                          ctdb_db->db_path, new_path));
600         talloc_free(new_path);
601         return 0;
602 }
603
604 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
605 {
606         struct ctdb_db_context *ctdb_db;
607         int ret;
608         int ok = 0;
609         int fail = 0;
610
611         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
612                 if (!ctdb_db->persistent) {
613                         continue;
614                 }
615
616                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
617                 if (ret != 0) {
618                         DEBUG(DEBUG_ALERT,(__location__
619                                            " load persistent health for '%s' failed\n",
620                                            ctdb_db->db_path));
621                         return -1;
622                 }
623
624                 if (ctdb_db->unhealthy_reason == NULL) {
625                         ok++;
626                         DEBUG(DEBUG_INFO,(__location__
627                                    " persistent db '%s' healthy\n",
628                                    ctdb_db->db_path));
629                         continue;
630                 }
631
632                 fail++;
633                 DEBUG(DEBUG_ALERT,(__location__
634                                    " persistent db '%s' unhealthy: %s\n",
635                                    ctdb_db->db_path,
636                                    ctdb_db->unhealthy_reason));
637         }
638         DEBUG(DEBUG_NOTICE,
639               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
640                ok, fail));
641
642         if (fail != 0) {
643                 return -1;
644         }
645
646         return 0;
647 }
648
649
650 /*
651   mark a database - as healthy
652  */
653 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
654 {
655         uint32_t db_id = *(uint32_t *)indata.dptr;
656         struct ctdb_db_context *ctdb_db;
657         int ret;
658         bool may_recover = false;
659
660         ctdb_db = find_ctdb_db(ctdb, db_id);
661         if (!ctdb_db) {
662                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
663                 return -1;
664         }
665
666         if (ctdb_db->unhealthy_reason) {
667                 may_recover = true;
668         }
669
670         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
671         if (ret != 0) {
672                 DEBUG(DEBUG_ERR,(__location__
673                                  " ctdb_update_persistent_health(%s) failed\n",
674                                  ctdb_db->db_name));
675                 return -1;
676         }
677
678         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
679                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
680                                   ctdb_db->db_name));
681                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
682         }
683
684         return 0;
685 }
686
687 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
688                                    TDB_DATA indata,
689                                    TDB_DATA *outdata)
690 {
691         uint32_t db_id = *(uint32_t *)indata.dptr;
692         struct ctdb_db_context *ctdb_db;
693         int ret;
694
695         ctdb_db = find_ctdb_db(ctdb, db_id);
696         if (!ctdb_db) {
697                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
698                 return -1;
699         }
700
701         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
702         if (ret != 0) {
703                 DEBUG(DEBUG_ERR,(__location__
704                                  " ctdb_load_persistent_health(%s) failed\n",
705                                  ctdb_db->db_name));
706                 return -1;
707         }
708
709         *outdata = tdb_null;
710         if (ctdb_db->unhealthy_reason) {
711                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
712                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
713         }
714
715         return 0;
716 }
717
718
719 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
720 {
721         char *ropath;
722
723         if (ctdb_db->readonly) {
724                 return 0;
725         }
726
727         if (ctdb_db->persistent) {
728                 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
729                 return -1;
730         }
731
732         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
733         if (ropath == NULL) {
734                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
735                 return -1;
736         }
737         ctdb_db->rottdb = tdb_open(ropath, 
738                               ctdb->tunable.database_hash_size, 
739                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
740                               O_CREAT|O_RDWR, 0600);
741         if (ctdb_db->rottdb == NULL) {
742                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
743                 talloc_free(ropath);
744                 return -1;
745         }
746
747         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
748
749         ctdb_db->readonly = true;
750
751         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
752
753         talloc_free(ropath);
754         return 0;
755 }
756
757 /*
758   attach to a database, handling both persistent and non-persistent databases
759   return 0 on success, -1 on failure
760  */
761 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
762                              bool persistent, const char *unhealthy_reason,
763                              bool jenkinshash, bool mutexes)
764 {
765         struct ctdb_db_context *ctdb_db, *tmp_db;
766         int ret;
767         struct TDB_DATA key;
768         unsigned tdb_flags;
769         int mode = 0600;
770         int remaining_tries = 0;
771
772         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
773         CTDB_NO_MEMORY(ctdb, ctdb_db);
774
775         ctdb_db->ctdb = ctdb;
776         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
777         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
778
779         key.dsize = strlen(db_name)+1;
780         key.dptr  = discard_const(db_name);
781         ctdb_db->db_id = ctdb_hash(&key);
782         ctdb_db->persistent = persistent;
783
784         if (!ctdb_db->persistent) {
785                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
786                 if (ctdb_db->delete_queue == NULL) {
787                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
788                 }
789
790                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
791         }
792
793         /* check for hash collisions */
794         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
795                 if (tmp_db->db_id == ctdb_db->db_id) {
796                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
797                                  tmp_db->db_id, db_name, tmp_db->db_name));
798                         talloc_free(ctdb_db);
799                         return -1;
800                 }
801         }
802
803         if (persistent) {
804                 if (unhealthy_reason) {
805                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
806                                                             unhealthy_reason, 0);
807                         if (ret != 0) {
808                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
809                                                    ctdb_db->db_name, unhealthy_reason, ret));
810                                 talloc_free(ctdb_db);
811                                 return -1;
812                         }
813                 }
814
815                 if (ctdb->max_persistent_check_errors > 0) {
816                         remaining_tries = 1;
817                 }
818                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
819                         remaining_tries = 0;
820                 }
821
822                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
823                 if (ret != 0) {
824                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
825                                    ctdb_db->db_name, ret));
826                         talloc_free(ctdb_db);
827                         return -1;
828                 }
829         }
830
831         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
832                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
833                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
834                 talloc_free(ctdb_db);
835                 return -1;
836         }
837
838         if (ctdb_db->unhealthy_reason) {
839                 /* this is just a warning, but we want that in the log file! */
840                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
841                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
842         }
843
844         /* open the database */
845         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
846                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
847                                            db_name, ctdb->pnn);
848
849         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
850         if (ctdb->valgrinding) {
851                 tdb_flags |= TDB_NOMMAP;
852         }
853         tdb_flags |= TDB_DISALLOW_NESTING;
854         if (jenkinshash) {
855                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
856         }
857 #ifdef TDB_MUTEX_LOCKING
858         if (ctdb->tunable.mutex_enabled && mutexes &&
859             tdb_runtime_check_for_robust_mutexes()) {
860                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
861         }
862 #endif
863
864 again:
865         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
866                                       ctdb->tunable.database_hash_size, 
867                                       tdb_flags, 
868                                       O_CREAT|O_RDWR, mode);
869         if (ctdb_db->ltdb == NULL) {
870                 struct stat st;
871                 int saved_errno = errno;
872
873                 if (!persistent) {
874                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
875                                           ctdb_db->db_path,
876                                           saved_errno,
877                                           strerror(saved_errno)));
878                         talloc_free(ctdb_db);
879                         return -1;
880                 }
881
882                 if (remaining_tries == 0) {
883                         DEBUG(DEBUG_CRIT,(__location__
884                                           "Failed to open persistent tdb '%s': %d - %s\n",
885                                           ctdb_db->db_path,
886                                           saved_errno,
887                                           strerror(saved_errno)));
888                         talloc_free(ctdb_db);
889                         return -1;
890                 }
891
892                 ret = stat(ctdb_db->db_path, &st);
893                 if (ret != 0) {
894                         DEBUG(DEBUG_CRIT,(__location__
895                                           "Failed to open persistent tdb '%s': %d - %s\n",
896                                           ctdb_db->db_path,
897                                           saved_errno,
898                                           strerror(saved_errno)));
899                         talloc_free(ctdb_db);
900                         return -1;
901                 }
902
903                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
904                 if (ret != 0) {
905                         DEBUG(DEBUG_CRIT,(__location__
906                                           "Failed to open persistent tdb '%s': %d - %s\n",
907                                           ctdb_db->db_path,
908                                           saved_errno,
909                                           strerror(saved_errno)));
910                         talloc_free(ctdb_db);
911                         return -1;
912                 }
913
914                 remaining_tries--;
915                 mode = st.st_mode;
916                 goto again;
917         }
918
919         if (!persistent) {
920                 ctdb_check_db_empty(ctdb_db);
921         } else {
922                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
923                 if (ret != 0) {
924                         int fd;
925                         struct stat st;
926
927                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
928                                           ctdb_db->db_path, ret,
929                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
930                         if (remaining_tries == 0) {
931                                 talloc_free(ctdb_db);
932                                 return -1;
933                         }
934
935                         fd = tdb_fd(ctdb_db->ltdb->tdb);
936                         ret = fstat(fd, &st);
937                         if (ret != 0) {
938                                 DEBUG(DEBUG_CRIT,(__location__
939                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
940                                                   ctdb_db->db_path,
941                                                   errno,
942                                                   strerror(errno)));
943                                 talloc_free(ctdb_db);
944                                 return -1;
945                         }
946
947                         /* close the TDB */
948                         talloc_free(ctdb_db->ltdb);
949                         ctdb_db->ltdb = NULL;
950
951                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
952                         if (ret != 0) {
953                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
954                                                   ctdb_db->db_path));
955                                 talloc_free(ctdb_db);
956                                 return -1;
957                         }
958
959                         remaining_tries--;
960                         mode = st.st_mode;
961                         goto again;
962                 }
963         }
964
965         /* set up a rb tree we can use to track which records we have a 
966            fetch-lock in-flight for so we can defer any additional calls
967            for the same record.
968          */
969         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
970         if (ctdb_db->deferred_fetch == NULL) {
971                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
972                 talloc_free(ctdb_db);
973                 return -1;
974         }
975
976         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
977         if (ctdb_db->defer_dmaster == NULL) {
978                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
979                                   ctdb_db->db_name));
980                 talloc_free(ctdb_db);
981                 return -1;
982         }
983
984         DLIST_ADD(ctdb->db_list, ctdb_db);
985
986         /* setting this can help some high churn databases */
987         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
988
989         /* 
990            all databases support the "null" function. we need this in
991            order to do forced migration of records
992         */
993         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
994         if (ret != 0) {
995                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
996                 talloc_free(ctdb_db);
997                 return -1;
998         }
999
1000         /* 
1001            all databases support the "fetch" function. we need this
1002            for efficient Samba3 ctdb fetch
1003         */
1004         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1005         if (ret != 0) {
1006                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1007                 talloc_free(ctdb_db);
1008                 return -1;
1009         }
1010
1011         /* 
1012            all databases support the "fetch_with_header" function. we need this
1013            for efficient readonly record fetches
1014         */
1015         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1016         if (ret != 0) {
1017                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1018                 talloc_free(ctdb_db);
1019                 return -1;
1020         }
1021
1022         ret = ctdb_vacuum_init(ctdb_db);
1023         if (ret != 0) {
1024                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1025                                   "database '%s'\n", ctdb_db->db_name));
1026                 talloc_free(ctdb_db);
1027                 return -1;
1028         }
1029
1030         ctdb_db->generation = ctdb->vnn_map->generation;
1031
1032         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1033                             ctdb_db->db_path, tdb_flags));
1034
1035         /* success */
1036         return 0;
1037 }
1038
1039
1040 struct ctdb_deferred_attach_context {
1041         struct ctdb_deferred_attach_context *next, *prev;
1042         struct ctdb_context *ctdb;
1043         struct ctdb_req_control_old *c;
1044 };
1045
1046
1047 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1048 {
1049         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1050
1051         return 0;
1052 }
1053
1054 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1055                                          struct tevent_timer *te,
1056                                          struct timeval t, void *private_data)
1057 {
1058         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1059         struct ctdb_context *ctdb = da_ctx->ctdb;
1060
1061         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1062         talloc_free(da_ctx);
1063 }
1064
1065 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1066                                           struct tevent_timer *te,
1067                                           struct timeval t, void *private_data)
1068 {
1069         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1070         struct ctdb_context *ctdb = da_ctx->ctdb;
1071
1072         /* This talloc-steals the packet ->c */
1073         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1074         talloc_free(da_ctx);
1075 }
1076
1077 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1078 {
1079         struct ctdb_deferred_attach_context *da_ctx;
1080
1081         /* call it from the main event loop as soon as the current event 
1082            finishes.
1083          */
1084         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1085                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1086                 tevent_add_timer(ctdb->ev, da_ctx,
1087                                  timeval_current_ofs(1,0),
1088                                  ctdb_deferred_attach_callback, da_ctx);
1089         }
1090
1091         return 0;
1092 }
1093
1094 /*
1095   a client has asked to attach a new database
1096  */
1097 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1098                                TDB_DATA *outdata, uint64_t tdb_flags, 
1099                                bool persistent, uint32_t client_id,
1100                                struct ctdb_req_control_old *c,
1101                                bool *async_reply)
1102 {
1103         const char *db_name = (const char *)indata.dptr;
1104         struct ctdb_db_context *db;
1105         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1106         struct ctdb_client *client = NULL;
1107         bool with_jenkinshash, with_mutexes;
1108
1109         if (ctdb->tunable.allow_client_db_attach == 0) {
1110                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1111                                   "AllowClientDBAccess == 0\n", db_name));
1112                 return -1;
1113         }
1114
1115         /* don't allow any local clients to attach while we are in recovery mode
1116          * except for the recovery daemon.
1117          * allow all attach from the network since these are always from remote
1118          * recovery daemons.
1119          */
1120         if (client_id != 0) {
1121                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1122         }
1123         if (client != NULL) {
1124                 /* If the node is inactive it is not part of the cluster
1125                    and we should not allow clients to attach to any
1126                    databases
1127                 */
1128                 if (node->flags & NODE_FLAGS_INACTIVE) {
1129                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1130                         return -1;
1131                 }
1132
1133                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1134                     client->pid != ctdb->recoverd_pid &&
1135                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1136                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1137
1138                         if (da_ctx == NULL) {
1139                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1140                                 return -1;
1141                         }
1142
1143                         da_ctx->ctdb = ctdb;
1144                         da_ctx->c = talloc_steal(da_ctx, c);
1145                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1146                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1147
1148                         tevent_add_timer(ctdb->ev, da_ctx,
1149                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1150                                          ctdb_deferred_attach_timeout, da_ctx);
1151
1152                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1153                         *async_reply = true;
1154                         return 0;
1155                 }
1156         }
1157
1158         /* the client can optionally pass additional tdb flags, but we
1159            only allow a subset of those on the database in ctdb. Note
1160            that tdb_flags is passed in via the (otherwise unused)
1161            srvid to the attach control */
1162 #ifdef TDB_MUTEX_LOCKING
1163         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1164 #else
1165         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1166 #endif
1167
1168         /* see if we already have this name */
1169         db = ctdb_db_handle(ctdb, db_name);
1170         if (db) {
1171                 if (db->persistent != persistent) {
1172                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1173                                           "database %s\n", persistent ? "" : "non-",
1174                                           db-> persistent ? "" : "non-", db_name));
1175                         return -1;
1176                 }
1177                 outdata->dptr  = (uint8_t *)&db->db_id;
1178                 outdata->dsize = sizeof(db->db_id);
1179                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1180                 return 0;
1181         }
1182
1183         with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1184 #ifdef TDB_MUTEX_LOCKING
1185         with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1186 #else
1187         with_mutexes = false;
1188 #endif
1189
1190         if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1191                               with_jenkinshash, with_mutexes) != 0) {
1192                 return -1;
1193         }
1194
1195         db = ctdb_db_handle(ctdb, db_name);
1196         if (!db) {
1197                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1198                 return -1;
1199         }
1200
1201         /* remember the flags the client has specified */
1202         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1203
1204         outdata->dptr  = (uint8_t *)&db->db_id;
1205         outdata->dsize = sizeof(db->db_id);
1206
1207         /* Try to ensure it's locked in mem */
1208         lockdown_memory(ctdb->valgrinding);
1209
1210         /* tell all the other nodes about this database */
1211         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1212                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1213                                                 CTDB_CONTROL_DB_ATTACH,
1214                                  0, CTDB_CTRL_FLAG_NOREPLY,
1215                                  indata, NULL, NULL);
1216
1217         /* success */
1218         return 0;
1219 }
1220
1221 /*
1222  * a client has asked to detach from a database
1223  */
1224 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1225                                uint32_t client_id)
1226 {
1227         uint32_t db_id;
1228         struct ctdb_db_context *ctdb_db;
1229         struct ctdb_client *client = NULL;
1230
1231         db_id = *(uint32_t *)indata.dptr;
1232         ctdb_db = find_ctdb_db(ctdb, db_id);
1233         if (ctdb_db == NULL) {
1234                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1235                                   db_id));
1236                 return -1;
1237         }
1238
1239         if (ctdb->tunable.allow_client_db_attach == 1) {
1240                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1241                                   "Clients are allowed access to databases "
1242                                   "(AllowClientDBAccess == 1)\n",
1243                                   ctdb_db->db_name));
1244                 return -1;
1245         }
1246
1247         if (ctdb_db->persistent) {
1248                 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1249                                   "denied\n", ctdb_db->db_name));
1250                 return -1;
1251         }
1252
1253         /* Cannot detach from database when in recovery */
1254         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1255                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1256                 return -1;
1257         }
1258
1259         /* If a control comes from a client, then broadcast it to all nodes.
1260          * Do the actual detach only if the control comes from other daemons.
1261          */
1262         if (client_id != 0) {
1263                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1264                 if (client != NULL) {
1265                         /* forward the control to all the nodes */
1266                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1267                                                  CTDB_CONTROL_DB_DETACH, 0,
1268                                                  CTDB_CTRL_FLAG_NOREPLY,
1269                                                  indata, NULL, NULL);
1270                         return 0;
1271                 }
1272                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1273                                   "for database '%s'\n", ctdb_db->db_name));
1274                 return -1;
1275         }
1276
1277         /* Detach database from recoverd */
1278         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1279                                      CTDB_SRVID_DETACH_DATABASE,
1280                                      indata) != 0) {
1281                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1282                 return -1;
1283         }
1284
1285         /* Disable vacuuming and drop all vacuuming data */
1286         talloc_free(ctdb_db->vacuum_handle);
1287         talloc_free(ctdb_db->delete_queue);
1288
1289         /* Terminate any deferred fetch */
1290         talloc_free(ctdb_db->deferred_fetch);
1291
1292         /* Terminate any traverses */
1293         while (ctdb_db->traverse) {
1294                 talloc_free(ctdb_db->traverse);
1295         }
1296
1297         /* Terminate any revokes */
1298         while (ctdb_db->revokechild_active) {
1299                 talloc_free(ctdb_db->revokechild_active);
1300         }
1301
1302         /* Free readonly tracking database */
1303         if (ctdb_db->readonly) {
1304                 talloc_free(ctdb_db->rottdb);
1305         }
1306
1307         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1308
1309         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1310                              ctdb_db->db_name));
1311         talloc_free(ctdb_db);
1312
1313         return 0;
1314 }
1315
1316 /*
1317   attach to all existing persistent databases
1318  */
1319 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1320                                   const char *unhealthy_reason)
1321 {
1322         DIR *d;
1323         struct dirent *de;
1324
1325         /* open the persistent db directory and scan it for files */
1326         d = opendir(ctdb->db_directory_persistent);
1327         if (d == NULL) {
1328                 return 0;
1329         }
1330
1331         while ((de=readdir(d))) {
1332                 char *p, *s, *q;
1333                 size_t len = strlen(de->d_name);
1334                 uint32_t node;
1335                 int invalid_name = 0;
1336                 
1337                 s = talloc_strdup(ctdb, de->d_name);
1338                 if (s == NULL) {
1339                         closedir(d);
1340                         CTDB_NO_MEMORY(ctdb, s);
1341                 }
1342
1343                 /* only accept names ending in .tdb */
1344                 p = strstr(s, ".tdb.");
1345                 if (len < 7 || p == NULL) {
1346                         talloc_free(s);
1347                         continue;
1348                 }
1349
1350                 /* only accept names ending with .tdb. and any number of digits */
1351                 q = p+5;
1352                 while (*q != 0 && invalid_name == 0) {
1353                         if (!isdigit(*q++)) {
1354                                 invalid_name = 1;
1355                         }
1356                 }
1357                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1358                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1359                         talloc_free(s);
1360                         continue;
1361                 }
1362                 p[4] = 0;
1363
1364                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1365                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1366                         closedir(d);
1367                         talloc_free(s);
1368                         return -1;
1369                 }
1370
1371                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1372
1373                 talloc_free(s);
1374         }
1375         closedir(d);
1376         return 0;
1377 }
1378
1379 int ctdb_attach_databases(struct ctdb_context *ctdb)
1380 {
1381         int ret;
1382         char *persistent_health_path = NULL;
1383         char *unhealthy_reason = NULL;
1384         bool first_try = true;
1385
1386         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1387                                                  ctdb->db_directory_state,
1388                                                  PERSISTENT_HEALTH_TDB,
1389                                                  ctdb->pnn);
1390         if (persistent_health_path == NULL) {
1391                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1392                 return -1;
1393         }
1394
1395 again:
1396
1397         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1398                                                    0, TDB_DISALLOW_NESTING,
1399                                                    O_CREAT | O_RDWR, 0600);
1400         if (ctdb->db_persistent_health == NULL) {
1401                 struct tdb_wrap *tdb;
1402
1403                 if (!first_try) {
1404                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1405                                           persistent_health_path,
1406                                           errno,
1407                                           strerror(errno)));
1408                         talloc_free(persistent_health_path);
1409                         talloc_free(unhealthy_reason);
1410                         return -1;
1411                 }
1412                 first_try = false;
1413
1414                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1415                                                    persistent_health_path,
1416                                                    "was cleared after a failure",
1417                                                    "manual verification needed");
1418                 if (unhealthy_reason == NULL) {
1419                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1420                         talloc_free(persistent_health_path);
1421                         return -1;
1422                 }
1423
1424                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1425                                   persistent_health_path));
1426                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1427                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1428                                     O_CREAT | O_RDWR, 0600);
1429                 if (tdb) {
1430                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1431                                           persistent_health_path,
1432                                           errno,
1433                                           strerror(errno)));
1434                         talloc_free(persistent_health_path);
1435                         talloc_free(unhealthy_reason);
1436                         return -1;
1437                 }
1438
1439                 talloc_free(tdb);
1440                 goto again;
1441         }
1442         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1443         if (ret != 0) {
1444                 struct tdb_wrap *tdb;
1445
1446                 talloc_free(ctdb->db_persistent_health);
1447                 ctdb->db_persistent_health = NULL;
1448
1449                 if (!first_try) {
1450                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1451                                           persistent_health_path));
1452                         talloc_free(persistent_health_path);
1453                         talloc_free(unhealthy_reason);
1454                         return -1;
1455                 }
1456                 first_try = false;
1457
1458                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1459                                                    persistent_health_path,
1460                                                    "was cleared after a failure",
1461                                                    "manual verification needed");
1462                 if (unhealthy_reason == NULL) {
1463                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1464                         talloc_free(persistent_health_path);
1465                         return -1;
1466                 }
1467
1468                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1469                                   persistent_health_path));
1470                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1471                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1472                                     O_CREAT | O_RDWR, 0600);
1473                 if (tdb) {
1474                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1475                                           persistent_health_path,
1476                                           errno,
1477                                           strerror(errno)));
1478                         talloc_free(persistent_health_path);
1479                         talloc_free(unhealthy_reason);
1480                         return -1;
1481                 }
1482
1483                 talloc_free(tdb);
1484                 goto again;
1485         }
1486         talloc_free(persistent_health_path);
1487
1488         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1489         talloc_free(unhealthy_reason);
1490         if (ret != 0) {
1491                 return ret;
1492         }
1493
1494         return 0;
1495 }
1496
1497 /*
1498   called when a broadcast seqnum update comes in
1499  */
1500 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1501 {
1502         struct ctdb_db_context *ctdb_db;
1503         if (srcnode == ctdb->pnn) {
1504                 /* don't update ourselves! */
1505                 return 0;
1506         }
1507
1508         ctdb_db = find_ctdb_db(ctdb, db_id);
1509         if (!ctdb_db) {
1510                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1511                 return -1;
1512         }
1513
1514         if (ctdb_db->unhealthy_reason) {
1515                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1516                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1517                 return -1;
1518         }
1519
1520         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1521         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1522         return 0;
1523 }
1524
1525 /*
1526   timer to check for seqnum changes in a ltdb and propogate them
1527  */
1528 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1529                                    struct tevent_timer *te,
1530                                    struct timeval t, void *p)
1531 {
1532         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1533         struct ctdb_context *ctdb = ctdb_db->ctdb;
1534         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1535         if (new_seqnum != ctdb_db->seqnum) {
1536                 /* something has changed - propogate it */
1537                 TDB_DATA data;
1538                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1539                 data.dsize = sizeof(uint32_t);
1540                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1541                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1542                                          data, NULL, NULL);             
1543         }
1544         ctdb_db->seqnum = new_seqnum;
1545
1546         /* setup a new timer */
1547         ctdb_db->seqnum_update =
1548                 tevent_add_timer(ctdb->ev, ctdb_db,
1549                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1550                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1551                                  ctdb_ltdb_seqnum_check, ctdb_db);
1552 }
1553
1554 /*
1555   enable seqnum handling on this db
1556  */
1557 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1558 {
1559         struct ctdb_db_context *ctdb_db;
1560         ctdb_db = find_ctdb_db(ctdb, db_id);
1561         if (!ctdb_db) {
1562                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1563                 return -1;
1564         }
1565
1566         if (ctdb_db->seqnum_update == NULL) {
1567                 ctdb_db->seqnum_update = tevent_add_timer(
1568                         ctdb->ev, ctdb_db,
1569                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1570                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1571                         ctdb_ltdb_seqnum_check, ctdb_db);
1572         }
1573
1574         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1575         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1576         return 0;
1577 }
1578
1579 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1580 {
1581         if (ctdb_db->sticky) {
1582                 return 0;
1583         }
1584
1585         if (ctdb_db->persistent) {
1586                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1587                 return -1;
1588         }
1589
1590         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1591
1592         ctdb_db->sticky = true;
1593
1594         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1595
1596         return 0;
1597 }
1598
1599 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1600 {
1601         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1602         int i;
1603
1604         for (i=0; i<MAX_HOT_KEYS; i++) {
1605                 if (s->hot_keys[i].key.dsize > 0) {
1606                         talloc_free(s->hot_keys[i].key.dptr);
1607                 }
1608         }
1609
1610         ZERO_STRUCT(ctdb_db->statistics);
1611 }
1612
1613 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1614                                 uint32_t db_id,
1615                                 TDB_DATA *outdata)
1616 {
1617         struct ctdb_db_context *ctdb_db;
1618         struct ctdb_db_statistics_old *stats;
1619         int i;
1620         int len;
1621         char *ptr;
1622
1623         ctdb_db = find_ctdb_db(ctdb, db_id);
1624         if (!ctdb_db) {
1625                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1626                 return -1;
1627         }
1628
1629         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1630         for (i = 0; i < MAX_HOT_KEYS; i++) {
1631                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1632         }
1633
1634         stats = talloc_size(outdata, len);
1635         if (stats == NULL) {
1636                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1637                 return -1;
1638         }
1639
1640         memcpy(stats, &ctdb_db->statistics,
1641                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1642
1643         stats->num_hot_keys = MAX_HOT_KEYS;
1644
1645         ptr = &stats->hot_keys_wire[0];
1646         for (i = 0; i < MAX_HOT_KEYS; i++) {
1647                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1648                        ctdb_db->statistics.hot_keys[i].key.dsize);
1649                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1650         }
1651
1652         outdata->dptr  = (uint8_t *)stats;
1653         outdata->dsize = len;
1654
1655         return 0;
1656 }