ctdb: Make TDB_SEQNUM work synchronously with ctdb
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #include "server/ctdb_config.h"
45
46 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
47
48 /**
49  * write a record to a normal database
50  *
51  * This is the server-variant of the ctdb_ltdb_store function.
52  * It contains logic to determine whether a record should be
53  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
54  * controls to the local ctdb daemon if apporpriate.
55  */
56 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
57                                   TDB_DATA key,
58                                   struct ctdb_ltdb_header *header,
59                                   TDB_DATA data)
60 {
61         struct ctdb_context *ctdb = ctdb_db->ctdb;
62         TDB_DATA rec[2];
63         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
64         int ret;
65         bool keep = false;
66         bool schedule_for_deletion = false;
67         bool remove_from_delete_queue = false;
68         uint32_t lmaster;
69
70         if (ctdb->flags & CTDB_FLAG_TORTURE) {
71                 TDB_DATA old;
72                 struct ctdb_ltdb_header *h2;
73
74                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
75                 h2 = (struct ctdb_ltdb_header *)old.dptr;
76                 if (old.dptr != NULL &&
77                     old.dsize >= hsize &&
78                     h2->rsn > header->rsn) {
79                         DEBUG(DEBUG_ERR,
80                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
81                                h2->rsn, header->rsn));
82                 }
83                 if (old.dptr) {
84                         free(old.dptr);
85                 }
86         }
87
88         if (ctdb->vnn_map == NULL) {
89                 /*
90                  * Called from a client: always store the record
91                  * Also don't call ctdb_lmaster since it uses the vnn_map!
92                  */
93                 keep = true;
94                 goto store;
95         }
96
97         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
98
99         /*
100          * If we migrate an empty record off to another node
101          * and the record has not been migrated with data,
102          * delete the record instead of storing the empty record.
103          */
104         if (data.dsize != 0) {
105                 keep = true;
106         } else if (header->flags & CTDB_REC_RO_FLAGS) {
107                 keep = true;
108         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
109                 /*
110                  * The record is not created by the client but
111                  * automatically by the ctdb_ltdb_fetch logic that
112                  * creates a record with an initial header in the
113                  * ltdb before trying to migrate the record from
114                  * the current lmaster. Keep it instead of trying
115                  * to delete the non-existing record...
116                  */
117                 keep = true;
118                 schedule_for_deletion = true;
119         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
120                 keep = true;
121         } else if (ctdb_db->ctdb->pnn == lmaster) {
122                 /*
123                  * If we are lmaster, then we usually keep the record.
124                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
125                  * and the record is empty and has never been migrated
126                  * with data, then we should delete it instead of storing it.
127                  * This is part of the vacuuming process.
128                  *
129                  * The reason that we usually need to store even empty records
130                  * on the lmaster is that a client operating directly on the
131                  * lmaster (== dmaster) expects the local copy of the record to
132                  * exist after successful ctdb migrate call. If the record does
133                  * not exist, the client goes into a migrate loop and eventually
134                  * fails. So storing the empty record makes sure that we do not
135                  * need to change the client code.
136                  */
137                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
138                         keep = true;
139                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
140                         keep = true;
141                 }
142         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
143                 keep = true;
144         }
145
146         if (keep) {
147                 if (ctdb_db_volatile(ctdb_db) &&
148                     (ctdb_db->ctdb->pnn == header->dmaster) &&
149                     !(header->flags & CTDB_REC_RO_FLAGS))
150                 {
151                         header->rsn++;
152
153                         if (data.dsize == 0) {
154                                 schedule_for_deletion = true;
155                         }
156                 }
157                 remove_from_delete_queue = !schedule_for_deletion;
158         }
159
160 store:
161         /*
162          * The VACUUM_MIGRATED flag is only set temporarily for
163          * the above logic when the record was retrieved by a
164          * VACUUM_MIGRATE call and should not be stored in the
165          * database.
166          *
167          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
168          * and there are two cases in which the corresponding record
169          * is stored in the local database:
170          * 1. The record has been migrated with data in the past
171          *    (the MIGRATED_WITH_DATA record flag is set).
172          * 2. The record has been filled with data again since it
173          *    had been submitted in the VACUUM_FETCH message to the
174          *    lmaster.
175          * For such records it is important to not store the
176          * VACUUM_MIGRATED flag in the database.
177          */
178         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
179
180         /*
181          * Similarly, clear the AUTOMATIC flag which should not enter
182          * the local database copy since this would require client
183          * modifications to clear the flag when the client stores
184          * the record.
185          */
186         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
187
188         rec[0].dsize = hsize;
189         rec[0].dptr = (uint8_t *)header;
190
191         rec[1].dsize = data.dsize;
192         rec[1].dptr = data.dptr;
193
194         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
195                             ctdb_db->db_name,
196                             keep?"storing":"deleting",
197                             ctdb_hash(&key)));
198
199         if (keep) {
200                 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
201         } else {
202                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
203         }
204
205         if (ret != 0) {
206                 int lvl = DEBUG_ERR;
207
208                 if (keep == false &&
209                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
210                 {
211                         lvl = DEBUG_DEBUG;
212                 }
213
214                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
215                             "%d - %s\n",
216                             ctdb_db->db_name,
217                             keep?"store":"delete", ret,
218                             tdb_errorstr(ctdb_db->ltdb->tdb)));
219
220                 schedule_for_deletion = false;
221                 remove_from_delete_queue = false;
222         }
223
224         if (schedule_for_deletion) {
225                 int ret2;
226                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
227                 if (ret2 != 0) {
228                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
229                 }
230         }
231
232         if (remove_from_delete_queue) {
233                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
234         }
235
236         return ret;
237 }
238
239 struct lock_fetch_state {
240         struct ctdb_context *ctdb;
241         struct ctdb_db_context *ctdb_db;
242         void (*recv_pkt)(void *, struct ctdb_req_header *);
243         void *recv_context;
244         struct ctdb_req_header *hdr;
245         uint32_t generation;
246         bool ignore_generation;
247 };
248
249 /*
250   called when we should retry the operation
251  */
252 static void lock_fetch_callback(void *p, bool locked)
253 {
254         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
255         if (!state->ignore_generation &&
256             state->generation != state->ctdb_db->generation) {
257                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
258                 talloc_free(state->hdr);
259                 return;
260         }
261         state->recv_pkt(state->recv_context, state->hdr);
262         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
263 }
264
265
266 /*
267   do a non-blocking ltdb_lock, deferring this ctdb request until we
268   have the chainlock
269
270   It does the following:
271
272    1) tries to get the chainlock. If it succeeds, then it returns 0
273
274    2) if it fails to get a chainlock immediately then it sets up a
275    non-blocking chainlock via ctdb_lock_record, and when it gets the
276    chainlock it re-submits this ctdb request to the main packet
277    receive function.
278
279    This effectively queues all ctdb requests that cannot be
280    immediately satisfied until it can get the lock. This means that
281    the main ctdb daemon will not block waiting for a chainlock held by
282    a client
283
284    There are 3 possible return values:
285
286        0:    means that it got the lock immediately.
287       -1:    means that it failed to get the lock, and won't retry
288       -2:    means that it failed to get the lock immediately, but will retry
289  */
290 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
291                            TDB_DATA key, struct ctdb_req_header *hdr,
292                            void (*recv_pkt)(void *, struct ctdb_req_header *),
293                            void *recv_context, bool ignore_generation)
294 {
295         int ret;
296         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
297         struct lock_request *lreq;
298         struct lock_fetch_state *state;
299         
300         ret = tdb_chainlock_nonblock(tdb, key);
301
302         if (ret != 0 &&
303             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
304                 /* a hard failure - don't try again */
305                 return -1;
306         }
307
308         /* when torturing, ensure we test the contended path */
309         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
310             random() % 5 == 0) {
311                 ret = -1;
312                 tdb_chainunlock(tdb, key);
313         }
314
315         /* first the non-contended path */
316         if (ret == 0) {
317                 return 0;
318         }
319
320         state = talloc(hdr, struct lock_fetch_state);
321         state->ctdb = ctdb_db->ctdb;
322         state->ctdb_db = ctdb_db;
323         state->hdr = hdr;
324         state->recv_pkt = recv_pkt;
325         state->recv_context = recv_context;
326         state->generation = ctdb_db->generation;
327         state->ignore_generation = ignore_generation;
328
329         /* now the contended path */
330         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
331         if (lreq == NULL) {
332                 return -1;
333         }
334
335         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
336            so it won't be freed yet */
337         talloc_steal(state, hdr);
338
339         /* now tell the caller than we will retry asynchronously */
340         return -2;
341 }
342
343 /*
344   a varient of ctdb_ltdb_lock_requeue that also fetches the record
345  */
346 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
347                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
348                                  struct ctdb_req_header *hdr, TDB_DATA *data,
349                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
350                                  void *recv_context, bool ignore_generation)
351 {
352         int ret;
353
354         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
355                                      recv_context, ignore_generation);
356         if (ret != 0) {
357                 return ret;
358         }
359
360         ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
361         if (ret != 0) {
362                 int uret;
363                 uret = ctdb_ltdb_unlock(ctdb_db, key);
364                 if (uret != 0) {
365                         DBG_ERR("ctdb_ltdb_unlock() failed with error %d\n",
366                                 uret);
367                 }
368         }
369         return ret;
370 }
371
372
373 /*
374   paranoid check to see if the db is empty
375  */
376 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
377 {
378         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
379         int count = tdb_traverse_read(tdb, NULL, NULL);
380         if (count != 0) {
381                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
382                          ctdb_db->db_path));
383                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
384         }
385 }
386
387 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
388                                 struct ctdb_db_context *ctdb_db)
389 {
390         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
391         char *old;
392         char *reason = NULL;
393         TDB_DATA key;
394         TDB_DATA val;
395
396         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
397         key.dsize = strlen(ctdb_db->db_name);
398
399         old = ctdb_db->unhealthy_reason;
400         ctdb_db->unhealthy_reason = NULL;
401
402         val = tdb_fetch(tdb, key);
403         if (val.dsize > 0) {
404                 reason = talloc_strndup(ctdb_db,
405                                         (const char *)val.dptr,
406                                         val.dsize);
407                 if (reason == NULL) {
408                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
409                                            (int)val.dsize));
410                         ctdb_db->unhealthy_reason = old;
411                         free(val.dptr);
412                         return -1;
413                 }
414         }
415
416         if (val.dptr) {
417                 free(val.dptr);
418         }
419
420         talloc_free(old);
421         ctdb_db->unhealthy_reason = reason;
422         return 0;
423 }
424
425 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
426                                   struct ctdb_db_context *ctdb_db,
427                                   const char *given_reason,/* NULL means healthy */
428                                   int num_healthy_nodes)
429 {
430         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
431         int ret;
432         TDB_DATA key;
433         TDB_DATA val;
434         char *new_reason = NULL;
435         char *old_reason = NULL;
436
437         ret = tdb_transaction_start(tdb);
438         if (ret != 0) {
439                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
440                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
441                 return -1;
442         }
443
444         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
445         if (ret != 0) {
446                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
447                                    ctdb_db->db_name, ret));
448                 return -1;
449         }
450         old_reason = ctdb_db->unhealthy_reason;
451
452         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
453         key.dsize = strlen(ctdb_db->db_name);
454
455         if (given_reason) {
456                 new_reason = talloc_strdup(ctdb_db, given_reason);
457                 if (new_reason == NULL) {
458                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
459                                           given_reason));
460                         return -1;
461                 }
462         } else if (old_reason && num_healthy_nodes == 0) {
463                 /*
464                  * If the reason indicates ok, but there where no healthy nodes
465                  * available, that it means, we have not recovered valid content
466                  * of the db. So if there's an old reason, prefix it with
467                  * "NO-HEALTHY-NODES - "
468                  */
469                 const char *prefix;
470
471 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
472                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
473                 if (ret != 0) {
474                         prefix = _TMP_PREFIX;
475                 } else {
476                         prefix = "";
477                 }
478                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
479                                          prefix, old_reason);
480                 if (new_reason == NULL) {
481                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
482                                           prefix, old_reason));
483                         return -1;
484                 }
485 #undef _TMP_PREFIX
486         }
487
488         if (new_reason) {
489                 val.dptr = discard_const_p(uint8_t, new_reason);
490                 val.dsize = strlen(new_reason);
491
492                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
493                 if (ret != 0) {
494                         tdb_transaction_cancel(tdb);
495                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
496                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
497                                            ret, tdb_errorstr(tdb)));
498                         talloc_free(new_reason);
499                         return -1;
500                 }
501                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
502                                    ctdb_db->db_name, new_reason));
503         } else if (old_reason) {
504                 ret = tdb_delete(tdb, key);
505                 if (ret != 0) {
506                         tdb_transaction_cancel(tdb);
507                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
508                                            tdb_name(tdb), ctdb_db->db_name,
509                                            ret, tdb_errorstr(tdb)));
510                         talloc_free(new_reason);
511                         return -1;
512                 }
513                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
514                                    ctdb_db->db_name));
515         }
516
517         ret = tdb_transaction_commit(tdb);
518         if (ret != TDB_SUCCESS) {
519                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
520                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
521                 talloc_free(new_reason);
522                 return -1;
523         }
524
525         talloc_free(old_reason);
526         ctdb_db->unhealthy_reason = new_reason;
527
528         return 0;
529 }
530
531 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
532                                      struct ctdb_db_context *ctdb_db)
533 {
534         time_t now = time(NULL);
535         char *new_path;
536         char *new_reason;
537         int ret;
538         struct tm *tm;
539
540         tm = gmtime(&now);
541
542         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
543         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
544                                    "%04u%02u%02u%02u%02u%02u.0Z",
545                                    ctdb_db->db_path,
546                                    tm->tm_year+1900, tm->tm_mon+1,
547                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
548                                    tm->tm_sec);
549         if (new_path == NULL) {
550                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
551                 return -1;
552         }
553
554         new_reason = talloc_asprintf(ctdb_db,
555                                      "ERROR - Backup of corrupted TDB in '%s'",
556                                      new_path);
557         if (new_reason == NULL) {
558                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
559                 return -1;
560         }
561         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
562         talloc_free(new_reason);
563         if (ret != 0) {
564                 DEBUG(DEBUG_CRIT,(__location__
565                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
566                                  ctdb_db->db_path));
567                 return -1;
568         }
569
570         ret = rename(ctdb_db->db_path, new_path);
571         if (ret != 0) {
572                 DEBUG(DEBUG_CRIT,(__location__
573                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
574                                   ctdb_db->db_path, new_path,
575                                   errno, strerror(errno)));
576                 talloc_free(new_path);
577                 return -1;
578         }
579
580         DEBUG(DEBUG_CRIT,(__location__
581                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
582                          ctdb_db->db_path, new_path));
583         talloc_free(new_path);
584         return 0;
585 }
586
587 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
588 {
589         struct ctdb_db_context *ctdb_db;
590         int ret;
591         int ok = 0;
592         int fail = 0;
593
594         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
595                 if (!ctdb_db_persistent(ctdb_db)) {
596                         continue;
597                 }
598
599                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
600                 if (ret != 0) {
601                         DEBUG(DEBUG_ALERT,(__location__
602                                            " load persistent health for '%s' failed\n",
603                                            ctdb_db->db_path));
604                         return -1;
605                 }
606
607                 if (ctdb_db->unhealthy_reason == NULL) {
608                         ok++;
609                         DEBUG(DEBUG_INFO,(__location__
610                                    " persistent db '%s' healthy\n",
611                                    ctdb_db->db_path));
612                         continue;
613                 }
614
615                 fail++;
616                 DEBUG(DEBUG_ALERT,(__location__
617                                    " persistent db '%s' unhealthy: %s\n",
618                                    ctdb_db->db_path,
619                                    ctdb_db->unhealthy_reason));
620         }
621         DEBUG(DEBUG_NOTICE,
622               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
623                ok, fail));
624
625         if (fail != 0) {
626                 return -1;
627         }
628
629         return 0;
630 }
631
632
633 /*
634   mark a database - as healthy
635  */
636 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
637 {
638         uint32_t db_id = *(uint32_t *)indata.dptr;
639         struct ctdb_db_context *ctdb_db;
640         int ret;
641         bool may_recover = false;
642
643         ctdb_db = find_ctdb_db(ctdb, db_id);
644         if (!ctdb_db) {
645                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
646                 return -1;
647         }
648
649         if (ctdb_db->unhealthy_reason) {
650                 may_recover = true;
651         }
652
653         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
654         if (ret != 0) {
655                 DEBUG(DEBUG_ERR,(__location__
656                                  " ctdb_update_persistent_health(%s) failed\n",
657                                  ctdb_db->db_name));
658                 return -1;
659         }
660
661         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
662                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
663                                   ctdb_db->db_name));
664                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
665         }
666
667         return 0;
668 }
669
670 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
671                                    TDB_DATA indata,
672                                    TDB_DATA *outdata)
673 {
674         uint32_t db_id = *(uint32_t *)indata.dptr;
675         struct ctdb_db_context *ctdb_db;
676         int ret;
677
678         ctdb_db = find_ctdb_db(ctdb, db_id);
679         if (!ctdb_db) {
680                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
681                 return -1;
682         }
683
684         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
685         if (ret != 0) {
686                 DEBUG(DEBUG_ERR,(__location__
687                                  " ctdb_load_persistent_health(%s) failed\n",
688                                  ctdb_db->db_name));
689                 return -1;
690         }
691
692         *outdata = tdb_null;
693         if (ctdb_db->unhealthy_reason) {
694                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
695                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
696         }
697
698         return 0;
699 }
700
701
702 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
703 {
704         char *ropath;
705
706         if (ctdb_db_readonly(ctdb_db)) {
707                 return 0;
708         }
709
710         if (! ctdb_db_volatile(ctdb_db)) {
711                 DEBUG(DEBUG_ERR,
712                       ("Non-volatile databases do not support readonly flag\n"));
713                 return -1;
714         }
715
716         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
717         if (ropath == NULL) {
718                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
719                 return -1;
720         }
721         ctdb_db->rottdb = tdb_open(ropath, 
722                               ctdb->tunable.database_hash_size, 
723                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
724                               O_CREAT|O_RDWR, 0600);
725         if (ctdb_db->rottdb == NULL) {
726                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
727                 talloc_free(ropath);
728                 return -1;
729         }
730
731         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
732
733         ctdb_db_set_readonly(ctdb_db);
734
735         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
736
737         talloc_free(ropath);
738         return 0;
739 }
740
741 /*
742   attach to a database, handling both persistent and non-persistent databases
743   return 0 on success, -1 on failure
744  */
745 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
746                              uint8_t db_flags, const char *unhealthy_reason)
747 {
748         struct ctdb_db_context *ctdb_db, *tmp_db;
749         int ret;
750         struct TDB_DATA key;
751         int tdb_flags;
752         int mode = 0600;
753         int remaining_tries = 0;
754
755         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
756         CTDB_NO_MEMORY(ctdb, ctdb_db);
757
758         ctdb_db->ctdb = ctdb;
759         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
760         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
761
762         key.dsize = strlen(db_name)+1;
763         key.dptr  = discard_const(db_name);
764         ctdb_db->db_id = ctdb_hash(&key);
765         ctdb_db->db_flags = db_flags;
766
767         if (ctdb_db_volatile(ctdb_db)) {
768                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
769                 if (ctdb_db->delete_queue == NULL) {
770                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
771                 }
772
773                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
774         }
775
776         /* check for hash collisions */
777         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
778                 if (tmp_db->db_id == ctdb_db->db_id) {
779                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
780                                  tmp_db->db_id, db_name, tmp_db->db_name));
781                         talloc_free(ctdb_db);
782                         return -1;
783                 }
784         }
785
786         if (ctdb_db_persistent(ctdb_db)) {
787                 if (unhealthy_reason) {
788                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
789                                                             unhealthy_reason, 0);
790                         if (ret != 0) {
791                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
792                                                    ctdb_db->db_name, unhealthy_reason, ret));
793                                 talloc_free(ctdb_db);
794                                 return -1;
795                         }
796                 }
797
798                 if (ctdb->max_persistent_check_errors > 0) {
799                         remaining_tries = 1;
800                 }
801                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
802                         remaining_tries = 0;
803                 }
804
805                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
806                 if (ret != 0) {
807                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
808                                    ctdb_db->db_name, ret));
809                         talloc_free(ctdb_db);
810                         return -1;
811                 }
812         }
813
814         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
815                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
816                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
817                 talloc_free(ctdb_db);
818                 return -1;
819         }
820
821         if (ctdb_db->unhealthy_reason) {
822                 /* this is just a warning, but we want that in the log file! */
823                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
824                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
825         }
826
827         /* open the database */
828         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
829                                            ctdb_db_persistent(ctdb_db) ?
830                                                 ctdb->db_directory_persistent :
831                                                 ctdb->db_directory,
832                                            db_name, ctdb->pnn);
833
834         tdb_flags = ctdb_db_tdb_flags(db_flags,
835                                       ctdb->valgrinding,
836                                       ctdb_config.tdb_mutexes);
837
838 again:
839         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
840                                       ctdb->tunable.database_hash_size, 
841                                       tdb_flags, 
842                                       O_CREAT|O_RDWR, mode);
843         if (ctdb_db->ltdb == NULL) {
844                 struct stat st;
845                 int saved_errno = errno;
846
847                 if (! ctdb_db_persistent(ctdb_db)) {
848                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
849                                           ctdb_db->db_path,
850                                           saved_errno,
851                                           strerror(saved_errno)));
852                         talloc_free(ctdb_db);
853                         return -1;
854                 }
855
856                 if (remaining_tries == 0) {
857                         DEBUG(DEBUG_CRIT,(__location__
858                                           "Failed to open persistent tdb '%s': %d - %s\n",
859                                           ctdb_db->db_path,
860                                           saved_errno,
861                                           strerror(saved_errno)));
862                         talloc_free(ctdb_db);
863                         return -1;
864                 }
865
866                 ret = stat(ctdb_db->db_path, &st);
867                 if (ret != 0) {
868                         DEBUG(DEBUG_CRIT,(__location__
869                                           "Failed to open persistent tdb '%s': %d - %s\n",
870                                           ctdb_db->db_path,
871                                           saved_errno,
872                                           strerror(saved_errno)));
873                         talloc_free(ctdb_db);
874                         return -1;
875                 }
876
877                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
878                 if (ret != 0) {
879                         DEBUG(DEBUG_CRIT,(__location__
880                                           "Failed to open persistent tdb '%s': %d - %s\n",
881                                           ctdb_db->db_path,
882                                           saved_errno,
883                                           strerror(saved_errno)));
884                         talloc_free(ctdb_db);
885                         return -1;
886                 }
887
888                 remaining_tries--;
889                 mode = st.st_mode;
890                 goto again;
891         }
892
893         if (!ctdb_db_persistent(ctdb_db)) {
894                 ctdb_check_db_empty(ctdb_db);
895         } else {
896                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
897                 if (ret != 0) {
898                         int fd;
899                         struct stat st;
900
901                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
902                                           ctdb_db->db_path, ret,
903                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
904                         if (remaining_tries == 0) {
905                                 talloc_free(ctdb_db);
906                                 return -1;
907                         }
908
909                         fd = tdb_fd(ctdb_db->ltdb->tdb);
910                         ret = fstat(fd, &st);
911                         if (ret != 0) {
912                                 DEBUG(DEBUG_CRIT,(__location__
913                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
914                                                   ctdb_db->db_path,
915                                                   errno,
916                                                   strerror(errno)));
917                                 talloc_free(ctdb_db);
918                                 return -1;
919                         }
920
921                         /* close the TDB */
922                         talloc_free(ctdb_db->ltdb);
923                         ctdb_db->ltdb = NULL;
924
925                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
926                         if (ret != 0) {
927                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
928                                                   ctdb_db->db_path));
929                                 talloc_free(ctdb_db);
930                                 return -1;
931                         }
932
933                         remaining_tries--;
934                         mode = st.st_mode;
935                         goto again;
936                 }
937         }
938
939         /* remember the flags the client has specified */
940         tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
941
942
943         /* set up a rb tree we can use to track which records we have a 
944            fetch-lock in-flight for so we can defer any additional calls
945            for the same record.
946          */
947         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
948         if (ctdb_db->deferred_fetch == NULL) {
949                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
950                 talloc_free(ctdb_db);
951                 return -1;
952         }
953
954         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
955         if (ctdb_db->defer_dmaster == NULL) {
956                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
957                                   ctdb_db->db_name));
958                 talloc_free(ctdb_db);
959                 return -1;
960         }
961
962         DLIST_ADD(ctdb->db_list, ctdb_db);
963
964         /* setting this can help some high churn databases */
965         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
966
967         /* 
968            all databases support the "null" function. we need this in
969            order to do forced migration of records
970         */
971         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
972         if (ret != 0) {
973                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
974                 talloc_free(ctdb_db);
975                 return -1;
976         }
977
978         /* 
979            all databases support the "fetch" function. we need this
980            for efficient Samba3 ctdb fetch
981         */
982         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
983         if (ret != 0) {
984                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
985                 talloc_free(ctdb_db);
986                 return -1;
987         }
988
989         /* 
990            all databases support the "fetch_with_header" function. we need this
991            for efficient readonly record fetches
992         */
993         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
994         if (ret != 0) {
995                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
996                 talloc_free(ctdb_db);
997                 return -1;
998         }
999
1000         ret = ctdb_vacuum_init(ctdb_db);
1001         if (ret != 0) {
1002                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1003                                   "database '%s'\n", ctdb_db->db_name));
1004                 talloc_free(ctdb_db);
1005                 return -1;
1006         }
1007
1008         ret = ctdb_migration_init(ctdb_db);
1009         if (ret != 0) {
1010                 DEBUG(DEBUG_ERR,
1011                       ("Failed to setup migration tracking for db '%s'\n",
1012                        ctdb_db->db_name));
1013                 talloc_free(ctdb_db);
1014                 return -1;
1015         }
1016
1017         ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1018                            &ctdb_db->lock_log);
1019         if (ret != 0) {
1020                 DEBUG(DEBUG_ERR,
1021                       ("Failed to setup lock logging for db '%s'\n",
1022                        ctdb_db->db_name));
1023                 talloc_free(ctdb_db);
1024                 return -1;
1025         }
1026
1027         ctdb_db->generation = ctdb->vnn_map->generation;
1028
1029         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1030                             ctdb_db->db_path, tdb_flags));
1031
1032         /* success */
1033         return 0;
1034 }
1035
1036
1037 struct ctdb_deferred_attach_context {
1038         struct ctdb_deferred_attach_context *next, *prev;
1039         struct ctdb_context *ctdb;
1040         struct ctdb_req_control_old *c;
1041 };
1042
1043
1044 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1045 {
1046         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1047
1048         return 0;
1049 }
1050
1051 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1052                                          struct tevent_timer *te,
1053                                          struct timeval t, void *private_data)
1054 {
1055         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1056         struct ctdb_context *ctdb = da_ctx->ctdb;
1057
1058         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1059         talloc_free(da_ctx);
1060 }
1061
1062 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1063                                           struct tevent_timer *te,
1064                                           struct timeval t, void *private_data)
1065 {
1066         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1067         struct ctdb_context *ctdb = da_ctx->ctdb;
1068
1069         /* This talloc-steals the packet ->c */
1070         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1071         talloc_free(da_ctx);
1072 }
1073
1074 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1075 {
1076         struct ctdb_deferred_attach_context *da_ctx;
1077
1078         /* call it from the main event loop as soon as the current event 
1079            finishes.
1080          */
1081         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1082                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1083                 tevent_add_timer(ctdb->ev, da_ctx,
1084                                  timeval_current_ofs(1,0),
1085                                  ctdb_deferred_attach_callback, da_ctx);
1086         }
1087
1088         return 0;
1089 }
1090
1091 /*
1092   a client has asked to attach a new database
1093  */
1094 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb,
1095                                TDB_DATA indata,
1096                                TDB_DATA *outdata,
1097                                uint8_t db_flags,
1098                                uint32_t srcnode,
1099                                uint32_t client_id,
1100                                struct ctdb_req_control_old *c,
1101                                bool *async_reply)
1102 {
1103         const char *db_name = (const char *)indata.dptr;
1104         struct ctdb_db_context *db;
1105         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1106         struct ctdb_client *client = NULL;
1107         uint32_t opcode;
1108
1109         if (ctdb->tunable.allow_client_db_attach == 0) {
1110                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1111                                   "AllowClientDBAccess == 0\n", db_name));
1112                 return -1;
1113         }
1114
1115         /* don't allow any local clients to attach while we are in recovery mode
1116          * except for the recovery daemon.
1117          * allow all attach from the network since these are always from remote
1118          * recovery daemons.
1119          */
1120         if (srcnode == ctdb->pnn && client_id != 0) {
1121                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1122         }
1123         if (client != NULL) {
1124                 /* If the node is inactive it is not part of the cluster
1125                    and we should not allow clients to attach to any
1126                    databases
1127                 */
1128                 if (node->flags & NODE_FLAGS_INACTIVE) {
1129                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1130                         return -1;
1131                 }
1132
1133                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1134                     client->pid != ctdb->recoverd_pid &&
1135                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1136                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1137
1138                         if (da_ctx == NULL) {
1139                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1140                                 return -1;
1141                         }
1142
1143                         da_ctx->ctdb = ctdb;
1144                         da_ctx->c = talloc_steal(da_ctx, c);
1145                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1146                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1147
1148                         tevent_add_timer(ctdb->ev, da_ctx,
1149                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1150                                          ctdb_deferred_attach_timeout, da_ctx);
1151
1152                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1153                         *async_reply = true;
1154                         return 0;
1155                 }
1156         }
1157
1158         /* see if we already have this name */
1159         db = ctdb_db_handle(ctdb, db_name);
1160         if (db) {
1161                 if ((db->db_flags & db_flags) != db_flags) {
1162                         DEBUG(DEBUG_ERR,
1163                               ("Error: Failed to re-attach with 0x%x flags,"
1164                                " database has 0x%x flags\n", db_flags,
1165                                db->db_flags));
1166                         return -1;
1167                 }
1168                 outdata->dptr  = (uint8_t *)&db->db_id;
1169                 outdata->dsize = sizeof(db->db_id);
1170                 return 0;
1171         }
1172
1173         if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
1174                 return -1;
1175         }
1176
1177         db = ctdb_db_handle(ctdb, db_name);
1178         if (!db) {
1179                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1180                 return -1;
1181         }
1182
1183         outdata->dptr  = (uint8_t *)&db->db_id;
1184         outdata->dsize = sizeof(db->db_id);
1185
1186         /* Try to ensure it's locked in mem */
1187         lockdown_memory(ctdb->valgrinding);
1188
1189         if (ctdb_db_persistent(db)) {
1190                 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1191         } else if (ctdb_db_replicated(db)) {
1192                 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1193         } else {
1194                 opcode = CTDB_CONTROL_DB_ATTACH;
1195         }
1196
1197         /* tell all the other nodes about this database */
1198         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
1199                                  0, CTDB_CTRL_FLAG_NOREPLY,
1200                                  indata, NULL, NULL);
1201
1202         /* success */
1203         return 0;
1204 }
1205
1206 /*
1207  * a client has asked to detach from a database
1208  */
1209 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1210                                uint32_t client_id)
1211 {
1212         uint32_t db_id;
1213         struct ctdb_db_context *ctdb_db;
1214         struct ctdb_client *client = NULL;
1215
1216         db_id = *(uint32_t *)indata.dptr;
1217         ctdb_db = find_ctdb_db(ctdb, db_id);
1218         if (ctdb_db == NULL) {
1219                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1220                                   db_id));
1221                 return -1;
1222         }
1223
1224         if (ctdb->tunable.allow_client_db_attach == 1) {
1225                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1226                                   "Clients are allowed access to databases "
1227                                   "(AllowClientDBAccess == 1)\n",
1228                                   ctdb_db->db_name));
1229                 return -1;
1230         }
1231
1232         if (! ctdb_db_volatile(ctdb_db)) {
1233                 DEBUG(DEBUG_ERR,
1234                       ("Detaching non-volatile database %s denied\n",
1235                        ctdb_db->db_name));
1236                 return -1;
1237         }
1238
1239         /* Cannot detach from database when in recovery */
1240         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1241                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1242                 return -1;
1243         }
1244
1245         /* If a control comes from a client, then broadcast it to all nodes.
1246          * Do the actual detach only if the control comes from other daemons.
1247          */
1248         if (client_id != 0) {
1249                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1250                 if (client != NULL) {
1251                         /* forward the control to all the nodes */
1252                         ctdb_daemon_send_control(ctdb,
1253                                                  CTDB_BROADCAST_CONNECTED, 0,
1254                                                  CTDB_CONTROL_DB_DETACH, 0,
1255                                                  CTDB_CTRL_FLAG_NOREPLY,
1256                                                  indata, NULL, NULL);
1257                         return 0;
1258                 }
1259                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1260                                   "for database '%s'\n", ctdb_db->db_name));
1261                 return -1;
1262         }
1263
1264         /* Detach database from recoverd */
1265         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1266                                      CTDB_SRVID_DETACH_DATABASE,
1267                                      indata) != 0) {
1268                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1269                 return -1;
1270         }
1271
1272         /* Disable vacuuming and drop all vacuuming data */
1273         talloc_free(ctdb_db->vacuum_handle);
1274         talloc_free(ctdb_db->delete_queue);
1275
1276         /* Terminate any deferred fetch */
1277         talloc_free(ctdb_db->deferred_fetch);
1278
1279         /* Terminate any traverses */
1280         while (ctdb_db->traverse) {
1281                 talloc_free(ctdb_db->traverse);
1282         }
1283
1284         /* Terminate any revokes */
1285         while (ctdb_db->revokechild_active) {
1286                 talloc_free(ctdb_db->revokechild_active);
1287         }
1288
1289         /* Free readonly tracking database */
1290         if (ctdb_db_readonly(ctdb_db)) {
1291                 talloc_free(ctdb_db->rottdb);
1292         }
1293
1294         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1295
1296         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1297                              ctdb_db->db_name));
1298         talloc_free(ctdb_db);
1299
1300         return 0;
1301 }
1302
1303 /*
1304   attach to all existing persistent databases
1305  */
1306 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1307                                   const char *unhealthy_reason)
1308 {
1309         DIR *d;
1310         struct dirent *de;
1311
1312         /* open the persistent db directory and scan it for files */
1313         d = opendir(ctdb->db_directory_persistent);
1314         if (d == NULL) {
1315                 return 0;
1316         }
1317
1318         while ((de=readdir(d))) {
1319                 char *p, *s, *q;
1320                 size_t len = strlen(de->d_name);
1321                 uint32_t node;
1322                 int invalid_name = 0;
1323                 
1324                 s = talloc_strdup(ctdb, de->d_name);
1325                 if (s == NULL) {
1326                         closedir(d);
1327                         CTDB_NO_MEMORY(ctdb, s);
1328                 }
1329
1330                 /* only accept names ending in .tdb */
1331                 p = strstr(s, ".tdb.");
1332                 if (len < 7 || p == NULL) {
1333                         talloc_free(s);
1334                         continue;
1335                 }
1336
1337                 /* only accept names ending with .tdb. and any number of digits */
1338                 q = p+5;
1339                 while (*q != 0 && invalid_name == 0) {
1340                         if (!isdigit(*q++)) {
1341                                 invalid_name = 1;
1342                         }
1343                 }
1344                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1345                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1346                         talloc_free(s);
1347                         continue;
1348                 }
1349                 p[4] = 0;
1350
1351                 if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
1352                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1353                         closedir(d);
1354                         talloc_free(s);
1355                         return -1;
1356                 }
1357
1358                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1359
1360                 talloc_free(s);
1361         }
1362         closedir(d);
1363         return 0;
1364 }
1365
1366 int ctdb_attach_databases(struct ctdb_context *ctdb)
1367 {
1368         int ret;
1369         char *persistent_health_path = NULL;
1370         char *unhealthy_reason = NULL;
1371         bool first_try = true;
1372
1373         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1374                                                  ctdb->db_directory_state,
1375                                                  PERSISTENT_HEALTH_TDB,
1376                                                  ctdb->pnn);
1377         if (persistent_health_path == NULL) {
1378                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1379                 return -1;
1380         }
1381
1382 again:
1383
1384         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1385                                                    0, TDB_DISALLOW_NESTING,
1386                                                    O_CREAT | O_RDWR, 0600);
1387         if (ctdb->db_persistent_health == NULL) {
1388                 struct tdb_wrap *tdb;
1389
1390                 if (!first_try) {
1391                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1392                                           persistent_health_path,
1393                                           errno,
1394                                           strerror(errno)));
1395                         talloc_free(persistent_health_path);
1396                         talloc_free(unhealthy_reason);
1397                         return -1;
1398                 }
1399                 first_try = false;
1400
1401                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1402                                                    persistent_health_path,
1403                                                    "was cleared after a failure",
1404                                                    "manual verification needed");
1405                 if (unhealthy_reason == NULL) {
1406                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1407                         talloc_free(persistent_health_path);
1408                         return -1;
1409                 }
1410
1411                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1412                                   persistent_health_path));
1413                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1414                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1415                                     O_CREAT | O_RDWR, 0600);
1416                 if (tdb) {
1417                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1418                                           persistent_health_path,
1419                                           errno,
1420                                           strerror(errno)));
1421                         talloc_free(persistent_health_path);
1422                         talloc_free(unhealthy_reason);
1423                         return -1;
1424                 }
1425
1426                 talloc_free(tdb);
1427                 goto again;
1428         }
1429         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1430         if (ret != 0) {
1431                 struct tdb_wrap *tdb;
1432
1433                 talloc_free(ctdb->db_persistent_health);
1434                 ctdb->db_persistent_health = NULL;
1435
1436                 if (!first_try) {
1437                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1438                                           persistent_health_path));
1439                         talloc_free(persistent_health_path);
1440                         talloc_free(unhealthy_reason);
1441                         return -1;
1442                 }
1443                 first_try = false;
1444
1445                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1446                                                    persistent_health_path,
1447                                                    "was cleared after a failure",
1448                                                    "manual verification needed");
1449                 if (unhealthy_reason == NULL) {
1450                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1451                         talloc_free(persistent_health_path);
1452                         return -1;
1453                 }
1454
1455                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1456                                   persistent_health_path));
1457                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1458                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1459                                     O_CREAT | O_RDWR, 0600);
1460                 if (tdb) {
1461                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1462                                           persistent_health_path,
1463                                           errno,
1464                                           strerror(errno)));
1465                         talloc_free(persistent_health_path);
1466                         talloc_free(unhealthy_reason);
1467                         return -1;
1468                 }
1469
1470                 talloc_free(tdb);
1471                 goto again;
1472         }
1473         talloc_free(persistent_health_path);
1474
1475         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1476         talloc_free(unhealthy_reason);
1477         if (ret != 0) {
1478                 return ret;
1479         }
1480
1481         return 0;
1482 }
1483
1484 /*
1485   called when a broadcast seqnum update comes in
1486  */
1487 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1488 {
1489         struct ctdb_db_context *ctdb_db;
1490         if (srcnode == ctdb->pnn) {
1491                 /* don't update ourselves! */
1492                 return 0;
1493         }
1494
1495         ctdb_db = find_ctdb_db(ctdb, db_id);
1496         if (!ctdb_db) {
1497                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1498                 return -1;
1499         }
1500
1501         if (ctdb_db->unhealthy_reason) {
1502                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1503                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1504                 return -1;
1505         }
1506
1507         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1508         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1509         return 0;
1510 }
1511
1512 /*
1513   timer to check for seqnum changes in a ltdb and propagate them
1514  */
1515 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1516                                    struct tevent_timer *te,
1517                                    struct timeval t, void *p)
1518 {
1519         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1520         struct ctdb_context *ctdb = ctdb_db->ctdb;
1521         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1522         if (new_seqnum != ctdb_db->seqnum) {
1523                 /* something has changed - propagate it */
1524                 TDB_DATA data;
1525                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1526                 data.dsize = sizeof(uint32_t);
1527                 ctdb_daemon_send_control(ctdb,
1528                                          CTDB_BROADCAST_ACTIVE,
1529                                          0,
1530                                          CTDB_CONTROL_UPDATE_SEQNUM,
1531                                          0,
1532                                          CTDB_CTRL_FLAG_NOREPLY,
1533                                          data,
1534                                          NULL,
1535                                          NULL);
1536         }
1537         ctdb_db->seqnum = new_seqnum;
1538
1539         /* setup a new timer */
1540         ctdb_db->seqnum_update =
1541                 tevent_add_timer(ctdb->ev, ctdb_db,
1542                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1543                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1544                                  ctdb_ltdb_seqnum_check, ctdb_db);
1545 }
1546
1547 /*
1548   enable seqnum handling on this db
1549  */
1550 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1551 {
1552         struct ctdb_db_context *ctdb_db;
1553         ctdb_db = find_ctdb_db(ctdb, db_id);
1554         if (!ctdb_db) {
1555                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1556                 return -1;
1557         }
1558
1559         if (ctdb_db->seqnum_update == NULL) {
1560                 ctdb_db->seqnum_update = tevent_add_timer(
1561                         ctdb->ev, ctdb_db,
1562                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1563                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1564                         ctdb_ltdb_seqnum_check, ctdb_db);
1565         }
1566
1567         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1568         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1569         return 0;
1570 }
1571
1572 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1573 {
1574         if (ctdb_db_sticky(ctdb_db)) {
1575                 return 0;
1576         }
1577
1578         if (! ctdb_db_volatile(ctdb_db)) {
1579                 DEBUG(DEBUG_ERR,
1580                       ("Non-volatile databases do not support sticky flag\n"));
1581                 return -1;
1582         }
1583
1584         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1585
1586         ctdb_db_set_sticky(ctdb_db);
1587
1588         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1589
1590         return 0;
1591 }
1592
1593 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1594 {
1595         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1596         int i;
1597
1598         for (i=0; i<MAX_HOT_KEYS; i++) {
1599                 if (s->hot_keys[i].key.dsize > 0) {
1600                         talloc_free(s->hot_keys[i].key.dptr);
1601                 }
1602         }
1603
1604         ZERO_STRUCT(ctdb_db->statistics);
1605 }
1606
1607 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1608                                 uint32_t db_id,
1609                                 TDB_DATA *outdata)
1610 {
1611         struct ctdb_db_context *ctdb_db;
1612         struct ctdb_db_statistics_old *stats;
1613         int i;
1614         int len;
1615         char *ptr;
1616
1617         ctdb_db = find_ctdb_db(ctdb, db_id);
1618         if (!ctdb_db) {
1619                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1620                 return -1;
1621         }
1622
1623         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1624         for (i = 0; i < MAX_HOT_KEYS; i++) {
1625                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1626         }
1627
1628         stats = talloc_size(outdata, len);
1629         if (stats == NULL) {
1630                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1631                 return -1;
1632         }
1633
1634         memcpy(stats, &ctdb_db->statistics,
1635                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1636
1637         stats->num_hot_keys = MAX_HOT_KEYS;
1638
1639         ptr = &stats->hot_keys_wire[0];
1640         for (i = 0; i < MAX_HOT_KEYS; i++) {
1641                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1642                        ctdb_db->statistics.hot_keys[i].key.dsize);
1643                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1644         }
1645
1646         outdata->dptr  = (uint8_t *)stats;
1647         outdata->dsize = len;
1648
1649         return 0;
1650 }