Dont allow client processes to attach to databases while we are still in recovery...
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "db_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /*
35   this is the dummy null procedure that all databases support
36 */
37 static int ctdb_null_func(struct ctdb_call_info *call)
38 {
39         return 0;
40 }
41
42 /*
43   this is a plain fetch procedure that all databases support
44 */
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
46 {
47         call->reply_data = &call->record_data;
48         return 0;
49 }
50
51
52
53 struct lock_fetch_state {
54         struct ctdb_context *ctdb;
55         void (*recv_pkt)(void *, struct ctdb_req_header *);
56         void *recv_context;
57         struct ctdb_req_header *hdr;
58         uint32_t generation;
59         bool ignore_generation;
60 };
61
62 /*
63   called when we should retry the operation
64  */
65 static void lock_fetch_callback(void *p)
66 {
67         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68         if (!state->ignore_generation &&
69             state->generation != state->ctdb->vnn_map->generation) {
70                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71                 talloc_free(state->hdr);
72                 return;
73         }
74         state->recv_pkt(state->recv_context, state->hdr);
75         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
76 }
77
78
79 /*
80   do a non-blocking ltdb_lock, deferring this ctdb request until we
81   have the chainlock
82
83   It does the following:
84
85    1) tries to get the chainlock. If it succeeds, then it returns 0
86
87    2) if it fails to get a chainlock immediately then it sets up a
88    non-blocking chainlock via ctdb_lockwait, and when it gets the
89    chainlock it re-submits this ctdb request to the main packet
90    receive function
91
92    This effectively queues all ctdb requests that cannot be
93    immediately satisfied until it can get the lock. This means that
94    the main ctdb daemon will not block waiting for a chainlock held by
95    a client
96
97    There are 3 possible return values:
98
99        0:    means that it got the lock immediately.
100       -1:    means that it failed to get the lock, and won't retry
101       -2:    means that it failed to get the lock immediately, but will retry
102  */
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
104                            TDB_DATA key, struct ctdb_req_header *hdr,
105                            void (*recv_pkt)(void *, struct ctdb_req_header *),
106                            void *recv_context, bool ignore_generation)
107 {
108         int ret;
109         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110         struct lockwait_handle *h;
111         struct lock_fetch_state *state;
112         
113         ret = tdb_chainlock_nonblock(tdb, key);
114
115         if (ret != 0 &&
116             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117                 /* a hard failure - don't try again */
118                 return -1;
119         }
120
121         /* when torturing, ensure we test the contended path */
122         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
123             random() % 5 == 0) {
124                 ret = -1;
125                 tdb_chainunlock(tdb, key);
126         }
127
128         /* first the non-contended path */
129         if (ret == 0) {
130                 return 0;
131         }
132
133         state = talloc(hdr, struct lock_fetch_state);
134         state->ctdb = ctdb_db->ctdb;
135         state->hdr = hdr;
136         state->recv_pkt = recv_pkt;
137         state->recv_context = recv_context;
138         state->generation = ctdb_db->ctdb->vnn_map->generation;
139         state->ignore_generation = ignore_generation;
140
141         /* now the contended path */
142         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
143         if (h == NULL) {
144                 return -1;
145         }
146
147         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
148            so it won't be freed yet */
149         talloc_steal(state, hdr);
150         talloc_steal(state, h);
151
152         /* now tell the caller than we will retry asynchronously */
153         return -2;
154 }
155
156 /*
157   a varient of ctdb_ltdb_lock_requeue that also fetches the record
158  */
159 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
160                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
161                                  struct ctdb_req_header *hdr, TDB_DATA *data,
162                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
163                                  void *recv_context, bool ignore_generation)
164 {
165         int ret;
166
167         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
168                                      recv_context, ignore_generation);
169         if (ret == 0) {
170                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
171                 if (ret != 0) {
172                         int uret;
173                         uret = ctdb_ltdb_unlock(ctdb_db, key);
174                         if (uret != 0) {
175                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
176                         }
177                 }
178         }
179         return ret;
180 }
181
182
183 /*
184   paraoid check to see if the db is empty
185  */
186 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
187 {
188         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
189         int count = tdb_traverse_read(tdb, NULL, NULL);
190         if (count != 0) {
191                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
192                          ctdb_db->db_path));
193                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
194         }
195 }
196
197 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
198                                 struct ctdb_db_context *ctdb_db)
199 {
200         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
201         char *old;
202         char *reason = NULL;
203         TDB_DATA key;
204         TDB_DATA val;
205
206         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
207         key.dsize = strlen(ctdb_db->db_name);
208
209         old = ctdb_db->unhealthy_reason;
210         ctdb_db->unhealthy_reason = NULL;
211
212         val = tdb_fetch(tdb, key);
213         if (val.dsize > 0) {
214                 reason = talloc_strndup(ctdb_db,
215                                         (const char *)val.dptr,
216                                         val.dsize);
217                 if (reason == NULL) {
218                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
219                                            (int)val.dsize));
220                         ctdb_db->unhealthy_reason = old;
221                         free(val.dptr);
222                         return -1;
223                 }
224         }
225
226         if (val.dptr) {
227                 free(val.dptr);
228         }
229
230         talloc_free(old);
231         ctdb_db->unhealthy_reason = reason;
232         return 0;
233 }
234
235 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
236                                   struct ctdb_db_context *ctdb_db,
237                                   const char *given_reason,/* NULL means healthy */
238                                   int num_healthy_nodes)
239 {
240         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
241         int ret;
242         TDB_DATA key;
243         TDB_DATA val;
244         char *new_reason = NULL;
245         char *old_reason = NULL;
246
247         ret = tdb_transaction_start(tdb);
248         if (ret != 0) {
249                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
250                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
251                 return -1;
252         }
253
254         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
255         if (ret != 0) {
256                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
257                                    ctdb_db->db_name, ret));
258                 return -1;
259         }
260         old_reason = ctdb_db->unhealthy_reason;
261
262         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
263         key.dsize = strlen(ctdb_db->db_name);
264
265         if (given_reason) {
266                 new_reason = talloc_strdup(ctdb_db, given_reason);
267                 if (new_reason == NULL) {
268                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
269                                           given_reason));
270                         return -1;
271                 }
272         } else if (old_reason && num_healthy_nodes == 0) {
273                 /*
274                  * If the reason indicates ok, but there where no healthy nodes
275                  * available, that it means, we have not recovered valid content
276                  * of the db. So if there's an old reason, prefix it with
277                  * "NO-HEALTHY-NODES - "
278                  */
279                 const char *prefix;
280
281 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
282                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
283                 if (ret != 0) {
284                         prefix = _TMP_PREFIX;
285                 } else {
286                         prefix = "";
287                 }
288                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
289                                          prefix, old_reason);
290                 if (new_reason == NULL) {
291                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
292                                           prefix, old_reason));
293                         return -1;
294                 }
295 #undef _TMP_PREFIX
296         }
297
298         if (new_reason) {
299                 val.dptr = discard_const_p(uint8_t, new_reason);
300                 val.dsize = strlen(new_reason);
301
302                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
303                 if (ret != 0) {
304                         tdb_transaction_cancel(tdb);
305                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
306                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
307                                            ret, tdb_errorstr(tdb)));
308                         talloc_free(new_reason);
309                         return -1;
310                 }
311                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
312                                    ctdb_db->db_name, new_reason));
313         } else if (old_reason) {
314                 ret = tdb_delete(tdb, key);
315                 if (ret != 0) {
316                         tdb_transaction_cancel(tdb);
317                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
318                                            tdb_name(tdb), ctdb_db->db_name,
319                                            ret, tdb_errorstr(tdb)));
320                         talloc_free(new_reason);
321                         return -1;
322                 }
323                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
324                                    ctdb_db->db_name));
325         }
326
327         ret = tdb_transaction_commit(tdb);
328         if (ret != TDB_SUCCESS) {
329                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
330                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
331                 talloc_free(new_reason);
332                 return -1;
333         }
334
335         talloc_free(old_reason);
336         ctdb_db->unhealthy_reason = new_reason;
337
338         return 0;
339 }
340
341 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
342                                      struct ctdb_db_context *ctdb_db)
343 {
344         time_t now = time(NULL);
345         char *new_path;
346         char *new_reason;
347         int ret;
348         struct tm *tm;
349
350         tm = gmtime(&now);
351
352         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
353         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
354                                    "%04u%02u%02u%02u%02u%02u.0Z",
355                                    ctdb_db->db_path,
356                                    tm->tm_year+1900, tm->tm_mon+1,
357                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
358                                    tm->tm_sec);
359         if (new_path == NULL) {
360                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
361                 return -1;
362         }
363
364         new_reason = talloc_asprintf(ctdb_db,
365                                      "ERROR - Backup of corrupted TDB in '%s'",
366                                      new_path);
367         if (new_reason == NULL) {
368                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
369                 return -1;
370         }
371         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
372         talloc_free(new_reason);
373         if (ret != 0) {
374                 DEBUG(DEBUG_CRIT,(__location__
375                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
376                                  ctdb_db->db_path));
377                 return -1;
378         }
379
380         ret = rename(ctdb_db->db_path, new_path);
381         if (ret != 0) {
382                 DEBUG(DEBUG_CRIT,(__location__
383                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
384                                   ctdb_db->db_path, new_path,
385                                   errno, strerror(errno)));
386                 talloc_free(new_path);
387                 return -1;
388         }
389
390         DEBUG(DEBUG_CRIT,(__location__
391                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
392                          ctdb_db->db_path, new_path));
393         talloc_free(new_path);
394         return 0;
395 }
396
397 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
398 {
399         struct ctdb_db_context *ctdb_db;
400         int ret;
401         int ok = 0;
402         int fail = 0;
403
404         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
405                 if (!ctdb_db->persistent) {
406                         continue;
407                 }
408
409                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
410                 if (ret != 0) {
411                         DEBUG(DEBUG_ALERT,(__location__
412                                            " load persistent health for '%s' failed\n",
413                                            ctdb_db->db_path));
414                         return -1;
415                 }
416
417                 if (ctdb_db->unhealthy_reason == NULL) {
418                         ok++;
419                         DEBUG(DEBUG_INFO,(__location__
420                                    " persistent db '%s' healthy\n",
421                                    ctdb_db->db_path));
422                         continue;
423                 }
424
425                 fail++;
426                 DEBUG(DEBUG_ALERT,(__location__
427                                    " persistent db '%s' unhealthy: %s\n",
428                                    ctdb_db->db_path,
429                                    ctdb_db->unhealthy_reason));
430         }
431         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
432               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
433                ok, fail));
434
435         if (fail != 0) {
436                 return -1;
437         }
438
439         return 0;
440 }
441
442
443 /*
444   mark a database - as healthy
445  */
446 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
447 {
448         uint32_t db_id = *(uint32_t *)indata.dptr;
449         struct ctdb_db_context *ctdb_db;
450         int ret;
451         bool may_recover = false;
452
453         ctdb_db = find_ctdb_db(ctdb, db_id);
454         if (!ctdb_db) {
455                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
456                 return -1;
457         }
458
459         if (ctdb_db->unhealthy_reason) {
460                 may_recover = true;
461         }
462
463         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
464         if (ret != 0) {
465                 DEBUG(DEBUG_ERR,(__location__
466                                  " ctdb_update_persistent_health(%s) failed\n",
467                                  ctdb_db->db_name));
468                 return -1;
469         }
470
471         if (may_recover && !ctdb->done_startup) {
472                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
473                                   ctdb_db->db_name));
474                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
475         }
476
477         return 0;
478 }
479
480 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
481                                    TDB_DATA indata,
482                                    TDB_DATA *outdata)
483 {
484         uint32_t db_id = *(uint32_t *)indata.dptr;
485         struct ctdb_db_context *ctdb_db;
486         int ret;
487
488         ctdb_db = find_ctdb_db(ctdb, db_id);
489         if (!ctdb_db) {
490                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
491                 return -1;
492         }
493
494         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
495         if (ret != 0) {
496                 DEBUG(DEBUG_ERR,(__location__
497                                  " ctdb_load_persistent_health(%s) failed\n",
498                                  ctdb_db->db_name));
499                 return -1;
500         }
501
502         *outdata = tdb_null;
503         if (ctdb_db->unhealthy_reason) {
504                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
505                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
506         }
507
508         return 0;
509 }
510
511 /*
512   attach to a database, handling both persistent and non-persistent databases
513   return 0 on success, -1 on failure
514  */
515 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
516                              bool persistent, const char *unhealthy_reason,
517                              bool jenkinshash)
518 {
519         struct ctdb_db_context *ctdb_db, *tmp_db;
520         int ret;
521         struct TDB_DATA key;
522         unsigned tdb_flags;
523         int mode = 0600;
524         int remaining_tries = 0;
525
526         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
527         CTDB_NO_MEMORY(ctdb, ctdb_db);
528
529         ctdb_db->priority = 1;
530         ctdb_db->ctdb = ctdb;
531         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
532         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
533
534         key.dsize = strlen(db_name)+1;
535         key.dptr  = discard_const(db_name);
536         ctdb_db->db_id = ctdb_hash(&key);
537         ctdb_db->persistent = persistent;
538
539         /* check for hash collisions */
540         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
541                 if (tmp_db->db_id == ctdb_db->db_id) {
542                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
543                                  tmp_db->db_id, db_name, tmp_db->db_name));
544                         talloc_free(ctdb_db);
545                         return -1;
546                 }
547         }
548
549         if (persistent) {
550                 if (unhealthy_reason) {
551                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
552                                                             unhealthy_reason, 0);
553                         if (ret != 0) {
554                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
555                                                    ctdb_db->db_name, unhealthy_reason, ret));
556                                 talloc_free(ctdb_db);
557                                 return -1;
558                         }
559                 }
560
561                 if (ctdb->max_persistent_check_errors > 0) {
562                         remaining_tries = 1;
563                 }
564                 if (ctdb->done_startup) {
565                         remaining_tries = 0;
566                 }
567
568                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
569                 if (ret != 0) {
570                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
571                                    ctdb_db->db_name, ret));
572                         talloc_free(ctdb_db);
573                         return -1;
574                 }
575         }
576
577         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
578                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
579                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
580                 talloc_free(ctdb_db);
581                 return -1;
582         }
583
584         if (ctdb_db->unhealthy_reason) {
585                 /* this is just a warning, but we want that in the log file! */
586                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
587                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
588         }
589
590         /* open the database */
591         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
592                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
593                                            db_name, ctdb->pnn);
594
595         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
596         if (ctdb->valgrinding) {
597                 tdb_flags |= TDB_NOMMAP;
598         }
599         tdb_flags |= TDB_DISALLOW_NESTING;
600         if (jenkinshash) {
601                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
602         }
603
604 again:
605         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
606                                       ctdb->tunable.database_hash_size, 
607                                       tdb_flags, 
608                                       O_CREAT|O_RDWR, mode);
609         if (ctdb_db->ltdb == NULL) {
610                 struct stat st;
611                 int saved_errno = errno;
612
613                 if (!persistent) {
614                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
615                                           ctdb_db->db_path,
616                                           saved_errno,
617                                           strerror(saved_errno)));
618                         talloc_free(ctdb_db);
619                         return -1;
620                 }
621
622                 if (remaining_tries == 0) {
623                         DEBUG(DEBUG_CRIT,(__location__
624                                           "Failed to open persistent tdb '%s': %d - %s\n",
625                                           ctdb_db->db_path,
626                                           saved_errno,
627                                           strerror(saved_errno)));
628                         talloc_free(ctdb_db);
629                         return -1;
630                 }
631
632                 ret = stat(ctdb_db->db_path, &st);
633                 if (ret != 0) {
634                         DEBUG(DEBUG_CRIT,(__location__
635                                           "Failed to open persistent tdb '%s': %d - %s\n",
636                                           ctdb_db->db_path,
637                                           saved_errno,
638                                           strerror(saved_errno)));
639                         talloc_free(ctdb_db);
640                         return -1;
641                 }
642
643                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
644                 if (ret != 0) {
645                         DEBUG(DEBUG_CRIT,(__location__
646                                           "Failed to open persistent tdb '%s': %d - %s\n",
647                                           ctdb_db->db_path,
648                                           saved_errno,
649                                           strerror(saved_errno)));
650                         talloc_free(ctdb_db);
651                         return -1;
652                 }
653
654                 remaining_tries--;
655                 mode = st.st_mode;
656                 goto again;
657         }
658
659         if (!persistent) {
660                 ctdb_check_db_empty(ctdb_db);
661         } else {
662                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
663                 if (ret != 0) {
664                         int fd;
665                         struct stat st;
666
667                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
668                                           ctdb_db->db_path, ret,
669                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
670                         if (remaining_tries == 0) {
671                                 talloc_free(ctdb_db);
672                                 return -1;
673                         }
674
675                         fd = tdb_fd(ctdb_db->ltdb->tdb);
676                         ret = fstat(fd, &st);
677                         if (ret != 0) {
678                                 DEBUG(DEBUG_CRIT,(__location__
679                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
680                                                   ctdb_db->db_path,
681                                                   errno,
682                                                   strerror(errno)));
683                                 talloc_free(ctdb_db);
684                                 return -1;
685                         }
686
687                         /* close the TDB */
688                         talloc_free(ctdb_db->ltdb);
689                         ctdb_db->ltdb = NULL;
690
691                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
692                         if (ret != 0) {
693                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
694                                                   ctdb_db->db_path));
695                                 talloc_free(ctdb_db);
696                                 return -1;
697                         }
698
699                         remaining_tries--;
700                         mode = st.st_mode;
701                         goto again;
702                 }
703         }
704
705         DLIST_ADD(ctdb->db_list, ctdb_db);
706
707         /* setting this can help some high churn databases */
708         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
709
710         /* 
711            all databases support the "null" function. we need this in
712            order to do forced migration of records
713         */
714         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
715         if (ret != 0) {
716                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
717                 talloc_free(ctdb_db);
718                 return -1;
719         }
720
721         /* 
722            all databases support the "fetch" function. we need this
723            for efficient Samba3 ctdb fetch
724         */
725         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
726         if (ret != 0) {
727                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
728                 talloc_free(ctdb_db);
729                 return -1;
730         }
731
732         ret = ctdb_vacuum_init(ctdb_db);
733         if (ret != 0) {
734                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
735                                   "database '%s'\n", ctdb_db->db_name));
736                 talloc_free(ctdb_db);
737                 return -1;
738         }
739
740
741         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
742         
743         /* success */
744         return 0;
745 }
746
747
748 /*
749   a client has asked to attach a new database
750  */
751 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
752                                TDB_DATA *outdata, uint64_t tdb_flags, 
753                                bool persistent, uint32_t client_id)
754 {
755         const char *db_name = (const char *)indata.dptr;
756         struct ctdb_db_context *db;
757         struct ctdb_node *node;
758
759         /* dont allow any local clients to attach while we are in recovery mode
760          * except for the recovery daemon.
761          * allow all attach from the network since these are always from remote
762          * recovery daemons.
763          */
764         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE && client_id != 0) {
765                 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
766
767                 if (client != NULL && client->pid != ctdb->recoverd_pid) {
768                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
769                         return -1;
770                 }
771         }
772
773         node = ctdb->nodes[ctdb->pnn];
774
775         /* the client can optionally pass additional tdb flags, but we
776            only allow a subset of those on the database in ctdb. Note
777            that tdb_flags is passed in via the (otherwise unused)
778            srvid to the attach control */
779         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
780
781         /* If the node is inactive it is not part of the cluster
782            and we should not allow clients to attach to any
783            databases
784         */
785         if (node->flags & NODE_FLAGS_INACTIVE) {
786                 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
787                 return -1;
788         }
789
790
791         /* see if we already have this name */
792         db = ctdb_db_handle(ctdb, db_name);
793         if (db) {
794                 outdata->dptr  = (uint8_t *)&db->db_id;
795                 outdata->dsize = sizeof(db->db_id);
796                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
797                 return 0;
798         }
799
800         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
801                 return -1;
802         }
803
804         db = ctdb_db_handle(ctdb, db_name);
805         if (!db) {
806                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
807                 return -1;
808         }
809
810         /* remember the flags the client has specified */
811         tdb_add_flags(db->ltdb->tdb, tdb_flags);
812
813         outdata->dptr  = (uint8_t *)&db->db_id;
814         outdata->dsize = sizeof(db->db_id);
815
816         /* Try to ensure it's locked in mem */
817         ctdb_lockdown_memory(ctdb);
818
819         /* tell all the other nodes about this database */
820         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
821                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
822                                                 CTDB_CONTROL_DB_ATTACH,
823                                  0, CTDB_CTRL_FLAG_NOREPLY,
824                                  indata, NULL, NULL);
825
826         /* success */
827         return 0;
828 }
829
830
831 /*
832   attach to all existing persistent databases
833  */
834 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
835                                   const char *unhealthy_reason)
836 {
837         DIR *d;
838         struct dirent *de;
839
840         /* open the persistent db directory and scan it for files */
841         d = opendir(ctdb->db_directory_persistent);
842         if (d == NULL) {
843                 return 0;
844         }
845
846         while ((de=readdir(d))) {
847                 char *p, *s, *q;
848                 size_t len = strlen(de->d_name);
849                 uint32_t node;
850                 int invalid_name = 0;
851                 
852                 s = talloc_strdup(ctdb, de->d_name);
853                 CTDB_NO_MEMORY(ctdb, s);
854
855                 /* only accept names ending in .tdb */
856                 p = strstr(s, ".tdb.");
857                 if (len < 7 || p == NULL) {
858                         talloc_free(s);
859                         continue;
860                 }
861
862                 /* only accept names ending with .tdb. and any number of digits */
863                 q = p+5;
864                 while (*q != 0 && invalid_name == 0) {
865                         if (!isdigit(*q++)) {
866                                 invalid_name = 1;
867                         }
868                 }
869                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
870                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
871                         talloc_free(s);
872                         continue;
873                 }
874                 p[4] = 0;
875
876                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
877                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
878                         closedir(d);
879                         talloc_free(s);
880                         return -1;
881                 }
882
883                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
884
885                 talloc_free(s);
886         }
887         closedir(d);
888         return 0;
889 }
890
891 int ctdb_attach_databases(struct ctdb_context *ctdb)
892 {
893         int ret;
894         char *persistent_health_path = NULL;
895         char *unhealthy_reason = NULL;
896         bool first_try = true;
897
898         if (ctdb->db_directory == NULL) {
899                 ctdb->db_directory = VARDIR "/ctdb";
900         }
901         if (ctdb->db_directory_persistent == NULL) {
902                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
903         }
904         if (ctdb->db_directory_state == NULL) {
905                 ctdb->db_directory_state = VARDIR "/ctdb/state";
906         }
907
908         /* make sure the db directory exists */
909         ret = mkdir(ctdb->db_directory, 0700);
910         if (ret == -1 && errno != EEXIST) {
911                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
912                          ctdb->db_directory));
913                 return -1;
914         }
915
916         /* make sure the persistent db directory exists */
917         ret = mkdir(ctdb->db_directory_persistent, 0700);
918         if (ret == -1 && errno != EEXIST) {
919                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
920                          ctdb->db_directory_persistent));
921                 return -1;
922         }
923
924         /* make sure the internal state db directory exists */
925         ret = mkdir(ctdb->db_directory_state, 0700);
926         if (ret == -1 && errno != EEXIST) {
927                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
928                          ctdb->db_directory_state));
929                 return -1;
930         }
931
932         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
933                                                  ctdb->db_directory_state,
934                                                  PERSISTENT_HEALTH_TDB,
935                                                  ctdb->pnn);
936         if (persistent_health_path == NULL) {
937                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
938                 return -1;
939         }
940
941 again:
942
943         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
944                                                    0, TDB_DISALLOW_NESTING,
945                                                    O_CREAT | O_RDWR, 0600);
946         if (ctdb->db_persistent_health == NULL) {
947                 struct tdb_wrap *tdb;
948
949                 if (!first_try) {
950                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
951                                           persistent_health_path,
952                                           errno,
953                                           strerror(errno)));
954                         talloc_free(persistent_health_path);
955                         talloc_free(unhealthy_reason);
956                         return -1;
957                 }
958                 first_try = false;
959
960                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
961                                                    persistent_health_path,
962                                                    "was cleared after a failure",
963                                                    "manual verification needed");
964                 if (unhealthy_reason == NULL) {
965                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
966                         talloc_free(persistent_health_path);
967                         return -1;
968                 }
969
970                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
971                                   persistent_health_path));
972                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
973                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
974                                     O_CREAT | O_RDWR, 0600);
975                 if (tdb) {
976                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
977                                           persistent_health_path,
978                                           errno,
979                                           strerror(errno)));
980                         talloc_free(persistent_health_path);
981                         talloc_free(unhealthy_reason);
982                         return -1;
983                 }
984
985                 talloc_free(tdb);
986                 goto again;
987         }
988         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
989         if (ret != 0) {
990                 struct tdb_wrap *tdb;
991
992                 talloc_free(ctdb->db_persistent_health);
993                 ctdb->db_persistent_health = NULL;
994
995                 if (!first_try) {
996                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
997                                           persistent_health_path));
998                         talloc_free(persistent_health_path);
999                         talloc_free(unhealthy_reason);
1000                         return -1;
1001                 }
1002                 first_try = false;
1003
1004                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1005                                                    persistent_health_path,
1006                                                    "was cleared after a failure",
1007                                                    "manual verification needed");
1008                 if (unhealthy_reason == NULL) {
1009                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1010                         talloc_free(persistent_health_path);
1011                         return -1;
1012                 }
1013
1014                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1015                                   persistent_health_path));
1016                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1017                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1018                                     O_CREAT | O_RDWR, 0600);
1019                 if (tdb) {
1020                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1021                                           persistent_health_path,
1022                                           errno,
1023                                           strerror(errno)));
1024                         talloc_free(persistent_health_path);
1025                         talloc_free(unhealthy_reason);
1026                         return -1;
1027                 }
1028
1029                 talloc_free(tdb);
1030                 goto again;
1031         }
1032         talloc_free(persistent_health_path);
1033
1034         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1035         talloc_free(unhealthy_reason);
1036         if (ret != 0) {
1037                 return ret;
1038         }
1039
1040         return 0;
1041 }
1042
1043 /*
1044   called when a broadcast seqnum update comes in
1045  */
1046 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1047 {
1048         struct ctdb_db_context *ctdb_db;
1049         if (srcnode == ctdb->pnn) {
1050                 /* don't update ourselves! */
1051                 return 0;
1052         }
1053
1054         ctdb_db = find_ctdb_db(ctdb, db_id);
1055         if (!ctdb_db) {
1056                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1057                 return -1;
1058         }
1059
1060         if (ctdb_db->unhealthy_reason) {
1061                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1062                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1063                 return -1;
1064         }
1065
1066         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1067         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1068         return 0;
1069 }
1070
1071 /*
1072   timer to check for seqnum changes in a ltdb and propogate them
1073  */
1074 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1075                                    struct timeval t, void *p)
1076 {
1077         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1078         struct ctdb_context *ctdb = ctdb_db->ctdb;
1079         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1080         if (new_seqnum != ctdb_db->seqnum) {
1081                 /* something has changed - propogate it */
1082                 TDB_DATA data;
1083                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1084                 data.dsize = sizeof(uint32_t);
1085                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1086                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1087                                          data, NULL, NULL);             
1088         }
1089         ctdb_db->seqnum = new_seqnum;
1090
1091         /* setup a new timer */
1092         ctdb_db->seqnum_update =
1093                 event_add_timed(ctdb->ev, ctdb_db, 
1094                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1095                                 ctdb_ltdb_seqnum_check, ctdb_db);
1096 }
1097
1098 /*
1099   enable seqnum handling on this db
1100  */
1101 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1102 {
1103         struct ctdb_db_context *ctdb_db;
1104         ctdb_db = find_ctdb_db(ctdb, db_id);
1105         if (!ctdb_db) {
1106                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1107                 return -1;
1108         }
1109
1110         if (ctdb_db->seqnum_update == NULL) {
1111                 ctdb_db->seqnum_update =
1112                         event_add_timed(ctdb->ev, ctdb_db, 
1113                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1114                                         ctdb_ltdb_seqnum_check, ctdb_db);
1115         }
1116
1117         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1118         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1119         return 0;
1120 }
1121
1122 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1123 {
1124         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1125         struct ctdb_db_context *ctdb_db;
1126
1127         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1128         if (!ctdb_db) {
1129                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1130                 return -1;
1131         }
1132
1133         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1134                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1135                 return -1;
1136         }
1137
1138         ctdb_db->priority = db_prio->priority;
1139         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1140
1141         return 0;
1142 }
1143
1144