vfs_fruit: change trigger points of AppleDouble conversion
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #include "server/ctdb_config.h"
45
46 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
47
48 /**
49  * write a record to a normal database
50  *
51  * This is the server-variant of the ctdb_ltdb_store function.
52  * It contains logic to determine whether a record should be
53  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
54  * controls to the local ctdb daemon if apporpriate.
55  */
56 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
57                                   TDB_DATA key,
58                                   struct ctdb_ltdb_header *header,
59                                   TDB_DATA data)
60 {
61         struct ctdb_context *ctdb = ctdb_db->ctdb;
62         TDB_DATA rec[2];
63         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
64         int ret;
65         bool seqnum_suppressed = false;
66         bool keep = false;
67         bool schedule_for_deletion = false;
68         bool remove_from_delete_queue = false;
69         uint32_t lmaster;
70
71         if (ctdb->flags & CTDB_FLAG_TORTURE) {
72                 TDB_DATA old;
73                 struct ctdb_ltdb_header *h2;
74
75                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
76                 h2 = (struct ctdb_ltdb_header *)old.dptr;
77                 if (old.dptr != NULL &&
78                     old.dsize >= hsize &&
79                     h2->rsn > header->rsn) {
80                         DEBUG(DEBUG_ERR,
81                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
82                                h2->rsn, header->rsn));
83                 }
84                 if (old.dptr) {
85                         free(old.dptr);
86                 }
87         }
88
89         if (ctdb->vnn_map == NULL) {
90                 /*
91                  * Called from a client: always store the record
92                  * Also don't call ctdb_lmaster since it uses the vnn_map!
93                  */
94                 keep = true;
95                 goto store;
96         }
97
98         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
99
100         /*
101          * If we migrate an empty record off to another node
102          * and the record has not been migrated with data,
103          * delete the record instead of storing the empty record.
104          */
105         if (data.dsize != 0) {
106                 keep = true;
107         } else if (header->flags & CTDB_REC_RO_FLAGS) {
108                 keep = true;
109         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
110                 /*
111                  * The record is not created by the client but
112                  * automatically by the ctdb_ltdb_fetch logic that
113                  * creates a record with an initial header in the
114                  * ltdb before trying to migrate the record from
115                  * the current lmaster. Keep it instead of trying
116                  * to delete the non-existing record...
117                  */
118                 keep = true;
119                 schedule_for_deletion = true;
120         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
121                 keep = true;
122         } else if (ctdb_db->ctdb->pnn == lmaster) {
123                 /*
124                  * If we are lmaster, then we usually keep the record.
125                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
126                  * and the record is empty and has never been migrated
127                  * with data, then we should delete it instead of storing it.
128                  * This is part of the vacuuming process.
129                  *
130                  * The reason that we usually need to store even empty records
131                  * on the lmaster is that a client operating directly on the
132                  * lmaster (== dmaster) expects the local copy of the record to
133                  * exist after successful ctdb migrate call. If the record does
134                  * not exist, the client goes into a migrate loop and eventually
135                  * fails. So storing the empty record makes sure that we do not
136                  * need to change the client code.
137                  */
138                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
139                         keep = true;
140                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
141                         keep = true;
142                 }
143         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
144                 keep = true;
145         }
146
147         if (keep) {
148                 if (ctdb_db_volatile(ctdb_db) &&
149                     (ctdb_db->ctdb->pnn == header->dmaster) &&
150                     !(header->flags & CTDB_REC_RO_FLAGS))
151                 {
152                         header->rsn++;
153
154                         if (data.dsize == 0) {
155                                 schedule_for_deletion = true;
156                         }
157                 }
158                 remove_from_delete_queue = !schedule_for_deletion;
159         }
160
161 store:
162         /*
163          * The VACUUM_MIGRATED flag is only set temporarily for
164          * the above logic when the record was retrieved by a
165          * VACUUM_MIGRATE call and should not be stored in the
166          * database.
167          *
168          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
169          * and there are two cases in which the corresponding record
170          * is stored in the local database:
171          * 1. The record has been migrated with data in the past
172          *    (the MIGRATED_WITH_DATA record flag is set).
173          * 2. The record has been filled with data again since it
174          *    had been submitted in the VACUUM_FETCH message to the
175          *    lmaster.
176          * For such records it is important to not store the
177          * VACUUM_MIGRATED flag in the database.
178          */
179         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
180
181         /*
182          * Similarly, clear the AUTOMATIC flag which should not enter
183          * the local database copy since this would require client
184          * modifications to clear the flag when the client stores
185          * the record.
186          */
187         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
188
189         rec[0].dsize = hsize;
190         rec[0].dptr = (uint8_t *)header;
191
192         rec[1].dsize = data.dsize;
193         rec[1].dptr = data.dptr;
194
195         /* Databases with seqnum updates enabled only get their seqnum
196            changes when/if we modify the data */
197         if (ctdb_db->seqnum_update != NULL) {
198                 TDB_DATA old;
199                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
200
201                 if ((old.dsize == hsize + data.dsize) &&
202                     memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
203                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
204                         seqnum_suppressed = true;
205                 }
206                 if (old.dptr != NULL) {
207                         free(old.dptr);
208                 }
209         }
210
211         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
212                             ctdb_db->db_name,
213                             keep?"storing":"deleting",
214                             ctdb_hash(&key)));
215
216         if (keep) {
217                 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
218         } else {
219                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
220         }
221
222         if (ret != 0) {
223                 int lvl = DEBUG_ERR;
224
225                 if (keep == false &&
226                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
227                 {
228                         lvl = DEBUG_DEBUG;
229                 }
230
231                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
232                             "%d - %s\n",
233                             ctdb_db->db_name,
234                             keep?"store":"delete", ret,
235                             tdb_errorstr(ctdb_db->ltdb->tdb)));
236
237                 schedule_for_deletion = false;
238                 remove_from_delete_queue = false;
239         }
240         if (seqnum_suppressed) {
241                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
242         }
243
244         if (schedule_for_deletion) {
245                 int ret2;
246                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
247                 if (ret2 != 0) {
248                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
249                 }
250         }
251
252         if (remove_from_delete_queue) {
253                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
254         }
255
256         return ret;
257 }
258
259 struct lock_fetch_state {
260         struct ctdb_context *ctdb;
261         struct ctdb_db_context *ctdb_db;
262         void (*recv_pkt)(void *, struct ctdb_req_header *);
263         void *recv_context;
264         struct ctdb_req_header *hdr;
265         uint32_t generation;
266         bool ignore_generation;
267 };
268
269 /*
270   called when we should retry the operation
271  */
272 static void lock_fetch_callback(void *p, bool locked)
273 {
274         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
275         if (!state->ignore_generation &&
276             state->generation != state->ctdb_db->generation) {
277                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
278                 talloc_free(state->hdr);
279                 return;
280         }
281         state->recv_pkt(state->recv_context, state->hdr);
282         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
283 }
284
285
286 /*
287   do a non-blocking ltdb_lock, deferring this ctdb request until we
288   have the chainlock
289
290   It does the following:
291
292    1) tries to get the chainlock. If it succeeds, then it returns 0
293
294    2) if it fails to get a chainlock immediately then it sets up a
295    non-blocking chainlock via ctdb_lock_record, and when it gets the
296    chainlock it re-submits this ctdb request to the main packet
297    receive function.
298
299    This effectively queues all ctdb requests that cannot be
300    immediately satisfied until it can get the lock. This means that
301    the main ctdb daemon will not block waiting for a chainlock held by
302    a client
303
304    There are 3 possible return values:
305
306        0:    means that it got the lock immediately.
307       -1:    means that it failed to get the lock, and won't retry
308       -2:    means that it failed to get the lock immediately, but will retry
309  */
310 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
311                            TDB_DATA key, struct ctdb_req_header *hdr,
312                            void (*recv_pkt)(void *, struct ctdb_req_header *),
313                            void *recv_context, bool ignore_generation)
314 {
315         int ret;
316         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
317         struct lock_request *lreq;
318         struct lock_fetch_state *state;
319         
320         ret = tdb_chainlock_nonblock(tdb, key);
321
322         if (ret != 0 &&
323             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
324                 /* a hard failure - don't try again */
325                 return -1;
326         }
327
328         /* when torturing, ensure we test the contended path */
329         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
330             random() % 5 == 0) {
331                 ret = -1;
332                 tdb_chainunlock(tdb, key);
333         }
334
335         /* first the non-contended path */
336         if (ret == 0) {
337                 return 0;
338         }
339
340         state = talloc(hdr, struct lock_fetch_state);
341         state->ctdb = ctdb_db->ctdb;
342         state->ctdb_db = ctdb_db;
343         state->hdr = hdr;
344         state->recv_pkt = recv_pkt;
345         state->recv_context = recv_context;
346         state->generation = ctdb_db->generation;
347         state->ignore_generation = ignore_generation;
348
349         /* now the contended path */
350         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
351         if (lreq == NULL) {
352                 return -1;
353         }
354
355         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
356            so it won't be freed yet */
357         talloc_steal(state, hdr);
358
359         /* now tell the caller than we will retry asynchronously */
360         return -2;
361 }
362
363 /*
364   a varient of ctdb_ltdb_lock_requeue that also fetches the record
365  */
366 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
367                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
368                                  struct ctdb_req_header *hdr, TDB_DATA *data,
369                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
370                                  void *recv_context, bool ignore_generation)
371 {
372         int ret;
373
374         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
375                                      recv_context, ignore_generation);
376         if (ret != 0) {
377                 return ret;
378         }
379
380         ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
381         if (ret != 0) {
382                 int uret;
383                 uret = ctdb_ltdb_unlock(ctdb_db, key);
384                 if (uret != 0) {
385                         DBG_ERR("ctdb_ltdb_unlock() failed with error %d\n",
386                                 uret);
387                 }
388         }
389         return ret;
390 }
391
392
393 /*
394   paranoid check to see if the db is empty
395  */
396 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
397 {
398         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
399         int count = tdb_traverse_read(tdb, NULL, NULL);
400         if (count != 0) {
401                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
402                          ctdb_db->db_path));
403                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
404         }
405 }
406
407 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
408                                 struct ctdb_db_context *ctdb_db)
409 {
410         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
411         char *old;
412         char *reason = NULL;
413         TDB_DATA key;
414         TDB_DATA val;
415
416         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
417         key.dsize = strlen(ctdb_db->db_name);
418
419         old = ctdb_db->unhealthy_reason;
420         ctdb_db->unhealthy_reason = NULL;
421
422         val = tdb_fetch(tdb, key);
423         if (val.dsize > 0) {
424                 reason = talloc_strndup(ctdb_db,
425                                         (const char *)val.dptr,
426                                         val.dsize);
427                 if (reason == NULL) {
428                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
429                                            (int)val.dsize));
430                         ctdb_db->unhealthy_reason = old;
431                         free(val.dptr);
432                         return -1;
433                 }
434         }
435
436         if (val.dptr) {
437                 free(val.dptr);
438         }
439
440         talloc_free(old);
441         ctdb_db->unhealthy_reason = reason;
442         return 0;
443 }
444
445 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
446                                   struct ctdb_db_context *ctdb_db,
447                                   const char *given_reason,/* NULL means healthy */
448                                   int num_healthy_nodes)
449 {
450         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
451         int ret;
452         TDB_DATA key;
453         TDB_DATA val;
454         char *new_reason = NULL;
455         char *old_reason = NULL;
456
457         ret = tdb_transaction_start(tdb);
458         if (ret != 0) {
459                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
460                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
461                 return -1;
462         }
463
464         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
465         if (ret != 0) {
466                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
467                                    ctdb_db->db_name, ret));
468                 return -1;
469         }
470         old_reason = ctdb_db->unhealthy_reason;
471
472         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
473         key.dsize = strlen(ctdb_db->db_name);
474
475         if (given_reason) {
476                 new_reason = talloc_strdup(ctdb_db, given_reason);
477                 if (new_reason == NULL) {
478                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
479                                           given_reason));
480                         return -1;
481                 }
482         } else if (old_reason && num_healthy_nodes == 0) {
483                 /*
484                  * If the reason indicates ok, but there where no healthy nodes
485                  * available, that it means, we have not recovered valid content
486                  * of the db. So if there's an old reason, prefix it with
487                  * "NO-HEALTHY-NODES - "
488                  */
489                 const char *prefix;
490
491 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
492                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
493                 if (ret != 0) {
494                         prefix = _TMP_PREFIX;
495                 } else {
496                         prefix = "";
497                 }
498                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
499                                          prefix, old_reason);
500                 if (new_reason == NULL) {
501                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
502                                           prefix, old_reason));
503                         return -1;
504                 }
505 #undef _TMP_PREFIX
506         }
507
508         if (new_reason) {
509                 val.dptr = discard_const_p(uint8_t, new_reason);
510                 val.dsize = strlen(new_reason);
511
512                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
513                 if (ret != 0) {
514                         tdb_transaction_cancel(tdb);
515                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
516                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
517                                            ret, tdb_errorstr(tdb)));
518                         talloc_free(new_reason);
519                         return -1;
520                 }
521                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
522                                    ctdb_db->db_name, new_reason));
523         } else if (old_reason) {
524                 ret = tdb_delete(tdb, key);
525                 if (ret != 0) {
526                         tdb_transaction_cancel(tdb);
527                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
528                                            tdb_name(tdb), ctdb_db->db_name,
529                                            ret, tdb_errorstr(tdb)));
530                         talloc_free(new_reason);
531                         return -1;
532                 }
533                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
534                                    ctdb_db->db_name));
535         }
536
537         ret = tdb_transaction_commit(tdb);
538         if (ret != TDB_SUCCESS) {
539                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
540                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
541                 talloc_free(new_reason);
542                 return -1;
543         }
544
545         talloc_free(old_reason);
546         ctdb_db->unhealthy_reason = new_reason;
547
548         return 0;
549 }
550
551 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
552                                      struct ctdb_db_context *ctdb_db)
553 {
554         time_t now = time(NULL);
555         char *new_path;
556         char *new_reason;
557         int ret;
558         struct tm *tm;
559
560         tm = gmtime(&now);
561
562         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
563         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
564                                    "%04u%02u%02u%02u%02u%02u.0Z",
565                                    ctdb_db->db_path,
566                                    tm->tm_year+1900, tm->tm_mon+1,
567                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
568                                    tm->tm_sec);
569         if (new_path == NULL) {
570                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
571                 return -1;
572         }
573
574         new_reason = talloc_asprintf(ctdb_db,
575                                      "ERROR - Backup of corrupted TDB in '%s'",
576                                      new_path);
577         if (new_reason == NULL) {
578                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
579                 return -1;
580         }
581         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
582         talloc_free(new_reason);
583         if (ret != 0) {
584                 DEBUG(DEBUG_CRIT,(__location__
585                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
586                                  ctdb_db->db_path));
587                 return -1;
588         }
589
590         ret = rename(ctdb_db->db_path, new_path);
591         if (ret != 0) {
592                 DEBUG(DEBUG_CRIT,(__location__
593                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
594                                   ctdb_db->db_path, new_path,
595                                   errno, strerror(errno)));
596                 talloc_free(new_path);
597                 return -1;
598         }
599
600         DEBUG(DEBUG_CRIT,(__location__
601                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
602                          ctdb_db->db_path, new_path));
603         talloc_free(new_path);
604         return 0;
605 }
606
607 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
608 {
609         struct ctdb_db_context *ctdb_db;
610         int ret;
611         int ok = 0;
612         int fail = 0;
613
614         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
615                 if (!ctdb_db_persistent(ctdb_db)) {
616                         continue;
617                 }
618
619                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
620                 if (ret != 0) {
621                         DEBUG(DEBUG_ALERT,(__location__
622                                            " load persistent health for '%s' failed\n",
623                                            ctdb_db->db_path));
624                         return -1;
625                 }
626
627                 if (ctdb_db->unhealthy_reason == NULL) {
628                         ok++;
629                         DEBUG(DEBUG_INFO,(__location__
630                                    " persistent db '%s' healthy\n",
631                                    ctdb_db->db_path));
632                         continue;
633                 }
634
635                 fail++;
636                 DEBUG(DEBUG_ALERT,(__location__
637                                    " persistent db '%s' unhealthy: %s\n",
638                                    ctdb_db->db_path,
639                                    ctdb_db->unhealthy_reason));
640         }
641         DEBUG(DEBUG_NOTICE,
642               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
643                ok, fail));
644
645         if (fail != 0) {
646                 return -1;
647         }
648
649         return 0;
650 }
651
652
653 /*
654   mark a database - as healthy
655  */
656 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
657 {
658         uint32_t db_id = *(uint32_t *)indata.dptr;
659         struct ctdb_db_context *ctdb_db;
660         int ret;
661         bool may_recover = false;
662
663         ctdb_db = find_ctdb_db(ctdb, db_id);
664         if (!ctdb_db) {
665                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
666                 return -1;
667         }
668
669         if (ctdb_db->unhealthy_reason) {
670                 may_recover = true;
671         }
672
673         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
674         if (ret != 0) {
675                 DEBUG(DEBUG_ERR,(__location__
676                                  " ctdb_update_persistent_health(%s) failed\n",
677                                  ctdb_db->db_name));
678                 return -1;
679         }
680
681         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
682                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
683                                   ctdb_db->db_name));
684                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
685         }
686
687         return 0;
688 }
689
690 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
691                                    TDB_DATA indata,
692                                    TDB_DATA *outdata)
693 {
694         uint32_t db_id = *(uint32_t *)indata.dptr;
695         struct ctdb_db_context *ctdb_db;
696         int ret;
697
698         ctdb_db = find_ctdb_db(ctdb, db_id);
699         if (!ctdb_db) {
700                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
701                 return -1;
702         }
703
704         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
705         if (ret != 0) {
706                 DEBUG(DEBUG_ERR,(__location__
707                                  " ctdb_load_persistent_health(%s) failed\n",
708                                  ctdb_db->db_name));
709                 return -1;
710         }
711
712         *outdata = tdb_null;
713         if (ctdb_db->unhealthy_reason) {
714                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
715                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
716         }
717
718         return 0;
719 }
720
721
722 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
723 {
724         char *ropath;
725
726         if (ctdb_db_readonly(ctdb_db)) {
727                 return 0;
728         }
729
730         if (! ctdb_db_volatile(ctdb_db)) {
731                 DEBUG(DEBUG_ERR,
732                       ("Non-volatile databases do not support readonly flag\n"));
733                 return -1;
734         }
735
736         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
737         if (ropath == NULL) {
738                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
739                 return -1;
740         }
741         ctdb_db->rottdb = tdb_open(ropath, 
742                               ctdb->tunable.database_hash_size, 
743                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
744                               O_CREAT|O_RDWR, 0600);
745         if (ctdb_db->rottdb == NULL) {
746                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
747                 talloc_free(ropath);
748                 return -1;
749         }
750
751         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
752
753         ctdb_db_set_readonly(ctdb_db);
754
755         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
756
757         talloc_free(ropath);
758         return 0;
759 }
760
761 /*
762   attach to a database, handling both persistent and non-persistent databases
763   return 0 on success, -1 on failure
764  */
765 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
766                              uint8_t db_flags, const char *unhealthy_reason)
767 {
768         struct ctdb_db_context *ctdb_db, *tmp_db;
769         int ret;
770         struct TDB_DATA key;
771         int tdb_flags;
772         int mode = 0600;
773         int remaining_tries = 0;
774
775         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
776         CTDB_NO_MEMORY(ctdb, ctdb_db);
777
778         ctdb_db->ctdb = ctdb;
779         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
780         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
781
782         key.dsize = strlen(db_name)+1;
783         key.dptr  = discard_const(db_name);
784         ctdb_db->db_id = ctdb_hash(&key);
785         ctdb_db->db_flags = db_flags;
786
787         if (ctdb_db_volatile(ctdb_db)) {
788                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
789                 if (ctdb_db->delete_queue == NULL) {
790                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
791                 }
792
793                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
794         }
795
796         /* check for hash collisions */
797         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
798                 if (tmp_db->db_id == ctdb_db->db_id) {
799                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
800                                  tmp_db->db_id, db_name, tmp_db->db_name));
801                         talloc_free(ctdb_db);
802                         return -1;
803                 }
804         }
805
806         if (ctdb_db_persistent(ctdb_db)) {
807                 if (unhealthy_reason) {
808                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
809                                                             unhealthy_reason, 0);
810                         if (ret != 0) {
811                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
812                                                    ctdb_db->db_name, unhealthy_reason, ret));
813                                 talloc_free(ctdb_db);
814                                 return -1;
815                         }
816                 }
817
818                 if (ctdb->max_persistent_check_errors > 0) {
819                         remaining_tries = 1;
820                 }
821                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
822                         remaining_tries = 0;
823                 }
824
825                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
826                 if (ret != 0) {
827                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
828                                    ctdb_db->db_name, ret));
829                         talloc_free(ctdb_db);
830                         return -1;
831                 }
832         }
833
834         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
835                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
836                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
837                 talloc_free(ctdb_db);
838                 return -1;
839         }
840
841         if (ctdb_db->unhealthy_reason) {
842                 /* this is just a warning, but we want that in the log file! */
843                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
844                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
845         }
846
847         /* open the database */
848         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
849                                            ctdb_db_persistent(ctdb_db) ?
850                                                 ctdb->db_directory_persistent :
851                                                 ctdb->db_directory,
852                                            db_name, ctdb->pnn);
853
854         tdb_flags = ctdb_db_tdb_flags(db_flags,
855                                       ctdb->valgrinding,
856                                       ctdb_config.tdb_mutexes);
857
858 again:
859         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
860                                       ctdb->tunable.database_hash_size, 
861                                       tdb_flags, 
862                                       O_CREAT|O_RDWR, mode);
863         if (ctdb_db->ltdb == NULL) {
864                 struct stat st;
865                 int saved_errno = errno;
866
867                 if (! ctdb_db_persistent(ctdb_db)) {
868                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
869                                           ctdb_db->db_path,
870                                           saved_errno,
871                                           strerror(saved_errno)));
872                         talloc_free(ctdb_db);
873                         return -1;
874                 }
875
876                 if (remaining_tries == 0) {
877                         DEBUG(DEBUG_CRIT,(__location__
878                                           "Failed to open persistent tdb '%s': %d - %s\n",
879                                           ctdb_db->db_path,
880                                           saved_errno,
881                                           strerror(saved_errno)));
882                         talloc_free(ctdb_db);
883                         return -1;
884                 }
885
886                 ret = stat(ctdb_db->db_path, &st);
887                 if (ret != 0) {
888                         DEBUG(DEBUG_CRIT,(__location__
889                                           "Failed to open persistent tdb '%s': %d - %s\n",
890                                           ctdb_db->db_path,
891                                           saved_errno,
892                                           strerror(saved_errno)));
893                         talloc_free(ctdb_db);
894                         return -1;
895                 }
896
897                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
898                 if (ret != 0) {
899                         DEBUG(DEBUG_CRIT,(__location__
900                                           "Failed to open persistent tdb '%s': %d - %s\n",
901                                           ctdb_db->db_path,
902                                           saved_errno,
903                                           strerror(saved_errno)));
904                         talloc_free(ctdb_db);
905                         return -1;
906                 }
907
908                 remaining_tries--;
909                 mode = st.st_mode;
910                 goto again;
911         }
912
913         if (!ctdb_db_persistent(ctdb_db)) {
914                 ctdb_check_db_empty(ctdb_db);
915         } else {
916                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
917                 if (ret != 0) {
918                         int fd;
919                         struct stat st;
920
921                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
922                                           ctdb_db->db_path, ret,
923                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
924                         if (remaining_tries == 0) {
925                                 talloc_free(ctdb_db);
926                                 return -1;
927                         }
928
929                         fd = tdb_fd(ctdb_db->ltdb->tdb);
930                         ret = fstat(fd, &st);
931                         if (ret != 0) {
932                                 DEBUG(DEBUG_CRIT,(__location__
933                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
934                                                   ctdb_db->db_path,
935                                                   errno,
936                                                   strerror(errno)));
937                                 talloc_free(ctdb_db);
938                                 return -1;
939                         }
940
941                         /* close the TDB */
942                         talloc_free(ctdb_db->ltdb);
943                         ctdb_db->ltdb = NULL;
944
945                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
946                         if (ret != 0) {
947                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
948                                                   ctdb_db->db_path));
949                                 talloc_free(ctdb_db);
950                                 return -1;
951                         }
952
953                         remaining_tries--;
954                         mode = st.st_mode;
955                         goto again;
956                 }
957         }
958
959         /* remember the flags the client has specified */
960         tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
961
962
963         /* set up a rb tree we can use to track which records we have a 
964            fetch-lock in-flight for so we can defer any additional calls
965            for the same record.
966          */
967         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
968         if (ctdb_db->deferred_fetch == NULL) {
969                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
970                 talloc_free(ctdb_db);
971                 return -1;
972         }
973
974         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
975         if (ctdb_db->defer_dmaster == NULL) {
976                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
977                                   ctdb_db->db_name));
978                 talloc_free(ctdb_db);
979                 return -1;
980         }
981
982         DLIST_ADD(ctdb->db_list, ctdb_db);
983
984         /* setting this can help some high churn databases */
985         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
986
987         /* 
988            all databases support the "null" function. we need this in
989            order to do forced migration of records
990         */
991         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
992         if (ret != 0) {
993                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
994                 talloc_free(ctdb_db);
995                 return -1;
996         }
997
998         /* 
999            all databases support the "fetch" function. we need this
1000            for efficient Samba3 ctdb fetch
1001         */
1002         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1003         if (ret != 0) {
1004                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1005                 talloc_free(ctdb_db);
1006                 return -1;
1007         }
1008
1009         /* 
1010            all databases support the "fetch_with_header" function. we need this
1011            for efficient readonly record fetches
1012         */
1013         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1014         if (ret != 0) {
1015                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1016                 talloc_free(ctdb_db);
1017                 return -1;
1018         }
1019
1020         ret = ctdb_vacuum_init(ctdb_db);
1021         if (ret != 0) {
1022                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1023                                   "database '%s'\n", ctdb_db->db_name));
1024                 talloc_free(ctdb_db);
1025                 return -1;
1026         }
1027
1028         ret = ctdb_migration_init(ctdb_db);
1029         if (ret != 0) {
1030                 DEBUG(DEBUG_ERR,
1031                       ("Failed to setup migration tracking for db '%s'\n",
1032                        ctdb_db->db_name));
1033                 talloc_free(ctdb_db);
1034                 return -1;
1035         }
1036
1037         ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1038                            &ctdb_db->lock_log);
1039         if (ret != 0) {
1040                 DEBUG(DEBUG_ERR,
1041                       ("Failed to setup lock logging for db '%s'\n",
1042                        ctdb_db->db_name));
1043                 talloc_free(ctdb_db);
1044                 return -1;
1045         }
1046
1047         ctdb_db->generation = ctdb->vnn_map->generation;
1048
1049         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1050                             ctdb_db->db_path, tdb_flags));
1051
1052         /* success */
1053         return 0;
1054 }
1055
1056
1057 struct ctdb_deferred_attach_context {
1058         struct ctdb_deferred_attach_context *next, *prev;
1059         struct ctdb_context *ctdb;
1060         struct ctdb_req_control_old *c;
1061 };
1062
1063
1064 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1065 {
1066         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1067
1068         return 0;
1069 }
1070
1071 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1072                                          struct tevent_timer *te,
1073                                          struct timeval t, void *private_data)
1074 {
1075         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1076         struct ctdb_context *ctdb = da_ctx->ctdb;
1077
1078         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1079         talloc_free(da_ctx);
1080 }
1081
1082 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1083                                           struct tevent_timer *te,
1084                                           struct timeval t, void *private_data)
1085 {
1086         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1087         struct ctdb_context *ctdb = da_ctx->ctdb;
1088
1089         /* This talloc-steals the packet ->c */
1090         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1091         talloc_free(da_ctx);
1092 }
1093
1094 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1095 {
1096         struct ctdb_deferred_attach_context *da_ctx;
1097
1098         /* call it from the main event loop as soon as the current event 
1099            finishes.
1100          */
1101         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1102                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1103                 tevent_add_timer(ctdb->ev, da_ctx,
1104                                  timeval_current_ofs(1,0),
1105                                  ctdb_deferred_attach_callback, da_ctx);
1106         }
1107
1108         return 0;
1109 }
1110
1111 /*
1112   a client has asked to attach a new database
1113  */
1114 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb,
1115                                TDB_DATA indata,
1116                                TDB_DATA *outdata,
1117                                uint8_t db_flags,
1118                                uint32_t srcnode,
1119                                uint32_t client_id,
1120                                struct ctdb_req_control_old *c,
1121                                bool *async_reply)
1122 {
1123         const char *db_name = (const char *)indata.dptr;
1124         struct ctdb_db_context *db;
1125         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1126         struct ctdb_client *client = NULL;
1127         uint32_t opcode;
1128
1129         if (ctdb->tunable.allow_client_db_attach == 0) {
1130                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1131                                   "AllowClientDBAccess == 0\n", db_name));
1132                 return -1;
1133         }
1134
1135         /* don't allow any local clients to attach while we are in recovery mode
1136          * except for the recovery daemon.
1137          * allow all attach from the network since these are always from remote
1138          * recovery daemons.
1139          */
1140         if (srcnode == ctdb->pnn && client_id != 0) {
1141                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1142         }
1143         if (client != NULL) {
1144                 /* If the node is inactive it is not part of the cluster
1145                    and we should not allow clients to attach to any
1146                    databases
1147                 */
1148                 if (node->flags & NODE_FLAGS_INACTIVE) {
1149                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1150                         return -1;
1151                 }
1152
1153                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1154                     client->pid != ctdb->recoverd_pid &&
1155                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1156                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1157
1158                         if (da_ctx == NULL) {
1159                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1160                                 return -1;
1161                         }
1162
1163                         da_ctx->ctdb = ctdb;
1164                         da_ctx->c = talloc_steal(da_ctx, c);
1165                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1166                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1167
1168                         tevent_add_timer(ctdb->ev, da_ctx,
1169                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1170                                          ctdb_deferred_attach_timeout, da_ctx);
1171
1172                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1173                         *async_reply = true;
1174                         return 0;
1175                 }
1176         }
1177
1178         /* see if we already have this name */
1179         db = ctdb_db_handle(ctdb, db_name);
1180         if (db) {
1181                 if ((db->db_flags & db_flags) != db_flags) {
1182                         DEBUG(DEBUG_ERR,
1183                               ("Error: Failed to re-attach with 0x%x flags,"
1184                                " database has 0x%x flags\n", db_flags,
1185                                db->db_flags));
1186                         return -1;
1187                 }
1188                 outdata->dptr  = (uint8_t *)&db->db_id;
1189                 outdata->dsize = sizeof(db->db_id);
1190                 return 0;
1191         }
1192
1193         if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
1194                 return -1;
1195         }
1196
1197         db = ctdb_db_handle(ctdb, db_name);
1198         if (!db) {
1199                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1200                 return -1;
1201         }
1202
1203         outdata->dptr  = (uint8_t *)&db->db_id;
1204         outdata->dsize = sizeof(db->db_id);
1205
1206         /* Try to ensure it's locked in mem */
1207         lockdown_memory(ctdb->valgrinding);
1208
1209         if (ctdb_db_persistent(db)) {
1210                 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1211         } else if (ctdb_db_replicated(db)) {
1212                 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1213         } else {
1214                 opcode = CTDB_CONTROL_DB_ATTACH;
1215         }
1216
1217         /* tell all the other nodes about this database */
1218         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
1219                                  0, CTDB_CTRL_FLAG_NOREPLY,
1220                                  indata, NULL, NULL);
1221
1222         /* success */
1223         return 0;
1224 }
1225
1226 /*
1227  * a client has asked to detach from a database
1228  */
1229 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1230                                uint32_t client_id)
1231 {
1232         uint32_t db_id;
1233         struct ctdb_db_context *ctdb_db;
1234         struct ctdb_client *client = NULL;
1235
1236         db_id = *(uint32_t *)indata.dptr;
1237         ctdb_db = find_ctdb_db(ctdb, db_id);
1238         if (ctdb_db == NULL) {
1239                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1240                                   db_id));
1241                 return -1;
1242         }
1243
1244         if (ctdb->tunable.allow_client_db_attach == 1) {
1245                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1246                                   "Clients are allowed access to databases "
1247                                   "(AllowClientDBAccess == 1)\n",
1248                                   ctdb_db->db_name));
1249                 return -1;
1250         }
1251
1252         if (! ctdb_db_volatile(ctdb_db)) {
1253                 DEBUG(DEBUG_ERR,
1254                       ("Detaching non-volatile database %s denied\n",
1255                        ctdb_db->db_name));
1256                 return -1;
1257         }
1258
1259         /* Cannot detach from database when in recovery */
1260         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1261                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1262                 return -1;
1263         }
1264
1265         /* If a control comes from a client, then broadcast it to all nodes.
1266          * Do the actual detach only if the control comes from other daemons.
1267          */
1268         if (client_id != 0) {
1269                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1270                 if (client != NULL) {
1271                         /* forward the control to all the nodes */
1272                         ctdb_daemon_send_control(ctdb,
1273                                                  CTDB_BROADCAST_CONNECTED, 0,
1274                                                  CTDB_CONTROL_DB_DETACH, 0,
1275                                                  CTDB_CTRL_FLAG_NOREPLY,
1276                                                  indata, NULL, NULL);
1277                         return 0;
1278                 }
1279                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1280                                   "for database '%s'\n", ctdb_db->db_name));
1281                 return -1;
1282         }
1283
1284         /* Detach database from recoverd */
1285         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1286                                      CTDB_SRVID_DETACH_DATABASE,
1287                                      indata) != 0) {
1288                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1289                 return -1;
1290         }
1291
1292         /* Disable vacuuming and drop all vacuuming data */
1293         talloc_free(ctdb_db->vacuum_handle);
1294         talloc_free(ctdb_db->delete_queue);
1295
1296         /* Terminate any deferred fetch */
1297         talloc_free(ctdb_db->deferred_fetch);
1298
1299         /* Terminate any traverses */
1300         while (ctdb_db->traverse) {
1301                 talloc_free(ctdb_db->traverse);
1302         }
1303
1304         /* Terminate any revokes */
1305         while (ctdb_db->revokechild_active) {
1306                 talloc_free(ctdb_db->revokechild_active);
1307         }
1308
1309         /* Free readonly tracking database */
1310         if (ctdb_db_readonly(ctdb_db)) {
1311                 talloc_free(ctdb_db->rottdb);
1312         }
1313
1314         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1315
1316         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1317                              ctdb_db->db_name));
1318         talloc_free(ctdb_db);
1319
1320         return 0;
1321 }
1322
1323 /*
1324   attach to all existing persistent databases
1325  */
1326 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1327                                   const char *unhealthy_reason)
1328 {
1329         DIR *d;
1330         struct dirent *de;
1331
1332         /* open the persistent db directory and scan it for files */
1333         d = opendir(ctdb->db_directory_persistent);
1334         if (d == NULL) {
1335                 return 0;
1336         }
1337
1338         while ((de=readdir(d))) {
1339                 char *p, *s, *q;
1340                 size_t len = strlen(de->d_name);
1341                 uint32_t node;
1342                 int invalid_name = 0;
1343                 
1344                 s = talloc_strdup(ctdb, de->d_name);
1345                 if (s == NULL) {
1346                         closedir(d);
1347                         CTDB_NO_MEMORY(ctdb, s);
1348                 }
1349
1350                 /* only accept names ending in .tdb */
1351                 p = strstr(s, ".tdb.");
1352                 if (len < 7 || p == NULL) {
1353                         talloc_free(s);
1354                         continue;
1355                 }
1356
1357                 /* only accept names ending with .tdb. and any number of digits */
1358                 q = p+5;
1359                 while (*q != 0 && invalid_name == 0) {
1360                         if (!isdigit(*q++)) {
1361                                 invalid_name = 1;
1362                         }
1363                 }
1364                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1365                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1366                         talloc_free(s);
1367                         continue;
1368                 }
1369                 p[4] = 0;
1370
1371                 if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
1372                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1373                         closedir(d);
1374                         talloc_free(s);
1375                         return -1;
1376                 }
1377
1378                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1379
1380                 talloc_free(s);
1381         }
1382         closedir(d);
1383         return 0;
1384 }
1385
1386 int ctdb_attach_databases(struct ctdb_context *ctdb)
1387 {
1388         int ret;
1389         char *persistent_health_path = NULL;
1390         char *unhealthy_reason = NULL;
1391         bool first_try = true;
1392
1393         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1394                                                  ctdb->db_directory_state,
1395                                                  PERSISTENT_HEALTH_TDB,
1396                                                  ctdb->pnn);
1397         if (persistent_health_path == NULL) {
1398                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1399                 return -1;
1400         }
1401
1402 again:
1403
1404         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1405                                                    0, TDB_DISALLOW_NESTING,
1406                                                    O_CREAT | O_RDWR, 0600);
1407         if (ctdb->db_persistent_health == NULL) {
1408                 struct tdb_wrap *tdb;
1409
1410                 if (!first_try) {
1411                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1412                                           persistent_health_path,
1413                                           errno,
1414                                           strerror(errno)));
1415                         talloc_free(persistent_health_path);
1416                         talloc_free(unhealthy_reason);
1417                         return -1;
1418                 }
1419                 first_try = false;
1420
1421                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1422                                                    persistent_health_path,
1423                                                    "was cleared after a failure",
1424                                                    "manual verification needed");
1425                 if (unhealthy_reason == NULL) {
1426                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1427                         talloc_free(persistent_health_path);
1428                         return -1;
1429                 }
1430
1431                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1432                                   persistent_health_path));
1433                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1434                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1435                                     O_CREAT | O_RDWR, 0600);
1436                 if (tdb) {
1437                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1438                                           persistent_health_path,
1439                                           errno,
1440                                           strerror(errno)));
1441                         talloc_free(persistent_health_path);
1442                         talloc_free(unhealthy_reason);
1443                         return -1;
1444                 }
1445
1446                 talloc_free(tdb);
1447                 goto again;
1448         }
1449         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1450         if (ret != 0) {
1451                 struct tdb_wrap *tdb;
1452
1453                 talloc_free(ctdb->db_persistent_health);
1454                 ctdb->db_persistent_health = NULL;
1455
1456                 if (!first_try) {
1457                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1458                                           persistent_health_path));
1459                         talloc_free(persistent_health_path);
1460                         talloc_free(unhealthy_reason);
1461                         return -1;
1462                 }
1463                 first_try = false;
1464
1465                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1466                                                    persistent_health_path,
1467                                                    "was cleared after a failure",
1468                                                    "manual verification needed");
1469                 if (unhealthy_reason == NULL) {
1470                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1471                         talloc_free(persistent_health_path);
1472                         return -1;
1473                 }
1474
1475                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1476                                   persistent_health_path));
1477                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1478                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1479                                     O_CREAT | O_RDWR, 0600);
1480                 if (tdb) {
1481                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1482                                           persistent_health_path,
1483                                           errno,
1484                                           strerror(errno)));
1485                         talloc_free(persistent_health_path);
1486                         talloc_free(unhealthy_reason);
1487                         return -1;
1488                 }
1489
1490                 talloc_free(tdb);
1491                 goto again;
1492         }
1493         talloc_free(persistent_health_path);
1494
1495         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1496         talloc_free(unhealthy_reason);
1497         if (ret != 0) {
1498                 return ret;
1499         }
1500
1501         return 0;
1502 }
1503
1504 /*
1505   called when a broadcast seqnum update comes in
1506  */
1507 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1508 {
1509         struct ctdb_db_context *ctdb_db;
1510         if (srcnode == ctdb->pnn) {
1511                 /* don't update ourselves! */
1512                 return 0;
1513         }
1514
1515         ctdb_db = find_ctdb_db(ctdb, db_id);
1516         if (!ctdb_db) {
1517                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1518                 return -1;
1519         }
1520
1521         if (ctdb_db->unhealthy_reason) {
1522                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1523                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1524                 return -1;
1525         }
1526
1527         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1528         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1529         return 0;
1530 }
1531
1532 /*
1533   timer to check for seqnum changes in a ltdb and propagate them
1534  */
1535 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1536                                    struct tevent_timer *te,
1537                                    struct timeval t, void *p)
1538 {
1539         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1540         struct ctdb_context *ctdb = ctdb_db->ctdb;
1541         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1542         if (new_seqnum != ctdb_db->seqnum) {
1543                 /* something has changed - propagate it */
1544                 TDB_DATA data;
1545                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1546                 data.dsize = sizeof(uint32_t);
1547                 ctdb_daemon_send_control(ctdb,
1548                                          CTDB_BROADCAST_ACTIVE,
1549                                          0,
1550                                          CTDB_CONTROL_UPDATE_SEQNUM,
1551                                          0,
1552                                          CTDB_CTRL_FLAG_NOREPLY,
1553                                          data,
1554                                          NULL,
1555                                          NULL);
1556         }
1557         ctdb_db->seqnum = new_seqnum;
1558
1559         /* setup a new timer */
1560         ctdb_db->seqnum_update =
1561                 tevent_add_timer(ctdb->ev, ctdb_db,
1562                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1563                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1564                                  ctdb_ltdb_seqnum_check, ctdb_db);
1565 }
1566
1567 /*
1568   enable seqnum handling on this db
1569  */
1570 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1571 {
1572         struct ctdb_db_context *ctdb_db;
1573         ctdb_db = find_ctdb_db(ctdb, db_id);
1574         if (!ctdb_db) {
1575                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1576                 return -1;
1577         }
1578
1579         if (ctdb_db->seqnum_update == NULL) {
1580                 ctdb_db->seqnum_update = tevent_add_timer(
1581                         ctdb->ev, ctdb_db,
1582                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1583                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1584                         ctdb_ltdb_seqnum_check, ctdb_db);
1585         }
1586
1587         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1588         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1589         return 0;
1590 }
1591
1592 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1593 {
1594         if (ctdb_db_sticky(ctdb_db)) {
1595                 return 0;
1596         }
1597
1598         if (! ctdb_db_volatile(ctdb_db)) {
1599                 DEBUG(DEBUG_ERR,
1600                       ("Non-volatile databases do not support sticky flag\n"));
1601                 return -1;
1602         }
1603
1604         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1605
1606         ctdb_db_set_sticky(ctdb_db);
1607
1608         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1609
1610         return 0;
1611 }
1612
1613 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1614 {
1615         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1616         int i;
1617
1618         for (i=0; i<MAX_HOT_KEYS; i++) {
1619                 if (s->hot_keys[i].key.dsize > 0) {
1620                         talloc_free(s->hot_keys[i].key.dptr);
1621                 }
1622         }
1623
1624         ZERO_STRUCT(ctdb_db->statistics);
1625 }
1626
1627 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1628                                 uint32_t db_id,
1629                                 TDB_DATA *outdata)
1630 {
1631         struct ctdb_db_context *ctdb_db;
1632         struct ctdb_db_statistics_old *stats;
1633         int i;
1634         int len;
1635         char *ptr;
1636
1637         ctdb_db = find_ctdb_db(ctdb, db_id);
1638         if (!ctdb_db) {
1639                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1640                 return -1;
1641         }
1642
1643         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1644         for (i = 0; i < MAX_HOT_KEYS; i++) {
1645                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1646         }
1647
1648         stats = talloc_size(outdata, len);
1649         if (stats == NULL) {
1650                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1651                 return -1;
1652         }
1653
1654         memcpy(stats, &ctdb_db->statistics,
1655                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1656
1657         stats->num_hot_keys = MAX_HOT_KEYS;
1658
1659         ptr = &stats->hot_keys_wire[0];
1660         for (i = 0; i < MAX_HOT_KEYS; i++) {
1661                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1662                        ctdb_db->statistics.hot_keys[i].key.dsize);
1663                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1664         }
1665
1666         outdata->dptr  = (uint8_t *)stats;
1667         outdata->dsize = len;
1668
1669         return 0;
1670 }