add additional logging when tdb_chainunlock() fails
[metze/ctdb/wip.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "db_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /*
35   this is the dummy null procedure that all databases support
36 */
37 static int ctdb_null_func(struct ctdb_call_info *call)
38 {
39         return 0;
40 }
41
42 /*
43   this is a plain fetch procedure that all databases support
44 */
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
46 {
47         call->reply_data = &call->record_data;
48         return 0;
49 }
50
51
52
53 struct lock_fetch_state {
54         struct ctdb_context *ctdb;
55         void (*recv_pkt)(void *, struct ctdb_req_header *);
56         void *recv_context;
57         struct ctdb_req_header *hdr;
58         uint32_t generation;
59         bool ignore_generation;
60 };
61
62 /*
63   called when we should retry the operation
64  */
65 static void lock_fetch_callback(void *p)
66 {
67         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68         if (!state->ignore_generation &&
69             state->generation != state->ctdb->vnn_map->generation) {
70                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71                 talloc_free(state->hdr);
72                 return;
73         }
74         state->recv_pkt(state->recv_context, state->hdr);
75         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
76 }
77
78
79 /*
80   do a non-blocking ltdb_lock, deferring this ctdb request until we
81   have the chainlock
82
83   It does the following:
84
85    1) tries to get the chainlock. If it succeeds, then it returns 0
86
87    2) if it fails to get a chainlock immediately then it sets up a
88    non-blocking chainlock via ctdb_lockwait, and when it gets the
89    chainlock it re-submits this ctdb request to the main packet
90    receive function
91
92    This effectively queues all ctdb requests that cannot be
93    immediately satisfied until it can get the lock. This means that
94    the main ctdb daemon will not block waiting for a chainlock held by
95    a client
96
97    There are 3 possible return values:
98
99        0:    means that it got the lock immediately.
100       -1:    means that it failed to get the lock, and won't retry
101       -2:    means that it failed to get the lock immediately, but will retry
102  */
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
104                            TDB_DATA key, struct ctdb_req_header *hdr,
105                            void (*recv_pkt)(void *, struct ctdb_req_header *),
106                            void *recv_context, bool ignore_generation)
107 {
108         int ret;
109         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110         struct lockwait_handle *h;
111         struct lock_fetch_state *state;
112         
113         ret = tdb_chainlock_nonblock(tdb, key);
114
115         if (ret != 0 &&
116             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117                 /* a hard failure - don't try again */
118                 return -1;
119         }
120
121         /* when torturing, ensure we test the contended path */
122         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
123             random() % 5 == 0) {
124                 ret = -1;
125                 tdb_chainunlock(tdb, key);
126         }
127
128         /* first the non-contended path */
129         if (ret == 0) {
130                 return 0;
131         }
132
133         state = talloc(hdr, struct lock_fetch_state);
134         state->ctdb = ctdb_db->ctdb;
135         state->hdr = hdr;
136         state->recv_pkt = recv_pkt;
137         state->recv_context = recv_context;
138         state->generation = ctdb_db->ctdb->vnn_map->generation;
139         state->ignore_generation = ignore_generation;
140
141         /* now the contended path */
142         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
143         if (h == NULL) {
144                 tdb_chainunlock(tdb, key);
145                 return -1;
146         }
147
148         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
149            so it won't be freed yet */
150         talloc_steal(state, hdr);
151         talloc_steal(state, h);
152
153         /* now tell the caller than we will retry asynchronously */
154         return -2;
155 }
156
157 /*
158   a varient of ctdb_ltdb_lock_requeue that also fetches the record
159  */
160 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
161                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
162                                  struct ctdb_req_header *hdr, TDB_DATA *data,
163                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
164                                  void *recv_context, bool ignore_generation)
165 {
166         int ret;
167
168         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
169                                      recv_context, ignore_generation);
170         if (ret == 0) {
171                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
172                 if (ret != 0) {
173                         int uret;
174                         uret = ctdb_ltdb_unlock(ctdb_db, key);
175                         if (uret != 0) {
176                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
177                         }
178                 }
179         }
180         return ret;
181 }
182
183
184 /*
185   paraoid check to see if the db is empty
186  */
187 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
188 {
189         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
190         int count = tdb_traverse_read(tdb, NULL, NULL);
191         if (count != 0) {
192                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
193                          ctdb_db->db_path));
194                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
195         }
196 }
197
198 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
199                                 struct ctdb_db_context *ctdb_db)
200 {
201         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
202         char *old;
203         char *reason = NULL;
204         TDB_DATA key;
205         TDB_DATA val;
206
207         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
208         key.dsize = strlen(ctdb_db->db_name);
209
210         old = ctdb_db->unhealthy_reason;
211         ctdb_db->unhealthy_reason = NULL;
212
213         val = tdb_fetch(tdb, key);
214         if (val.dsize > 0) {
215                 reason = talloc_strndup(ctdb_db,
216                                         (const char *)val.dptr,
217                                         val.dsize);
218                 if (reason == NULL) {
219                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
220                                            (int)val.dsize));
221                         ctdb_db->unhealthy_reason = old;
222                         free(val.dptr);
223                         return -1;
224                 }
225         }
226
227         if (val.dptr) {
228                 free(val.dptr);
229         }
230
231         talloc_free(old);
232         ctdb_db->unhealthy_reason = reason;
233         return 0;
234 }
235
236 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
237                                   struct ctdb_db_context *ctdb_db,
238                                   const char *given_reason,/* NULL means healthy */
239                                   int num_healthy_nodes)
240 {
241         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
242         int ret;
243         TDB_DATA key;
244         TDB_DATA val;
245         char *new_reason = NULL;
246         char *old_reason = NULL;
247
248         ret = tdb_transaction_start(tdb);
249         if (ret != 0) {
250                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
251                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
252                 return -1;
253         }
254
255         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
256         if (ret != 0) {
257                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
258                                    ctdb_db->db_name, ret));
259                 return -1;
260         }
261         old_reason = ctdb_db->unhealthy_reason;
262
263         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
264         key.dsize = strlen(ctdb_db->db_name);
265
266         if (given_reason) {
267                 new_reason = talloc_strdup(ctdb_db, given_reason);
268                 if (new_reason == NULL) {
269                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
270                                           given_reason));
271                         return -1;
272                 }
273         } else if (old_reason && num_healthy_nodes == 0) {
274                 /*
275                  * If the reason indicates ok, but there where no healthy nodes
276                  * available, that it means, we have not recovered valid content
277                  * of the db. So if there's an old reason, prefix it with
278                  * "NO-HEALTHY-NODES - "
279                  */
280                 const char *prefix;
281
282 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
283                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
284                 if (ret != 0) {
285                         prefix = _TMP_PREFIX;
286                 } else {
287                         prefix = "";
288                 }
289                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
290                                          prefix, old_reason);
291                 if (new_reason == NULL) {
292                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
293                                           prefix, old_reason));
294                         return -1;
295                 }
296 #undef _TMP_PREFIX
297         }
298
299         if (new_reason) {
300                 val.dptr = discard_const_p(uint8_t, new_reason);
301                 val.dsize = strlen(new_reason);
302
303                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
304                 if (ret != 0) {
305                         tdb_transaction_cancel(tdb);
306                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
307                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
308                                            ret, tdb_errorstr(tdb)));
309                         talloc_free(new_reason);
310                         return -1;
311                 }
312                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
313                                    ctdb_db->db_name, new_reason));
314         } else if (old_reason) {
315                 ret = tdb_delete(tdb, key);
316                 if (ret != 0) {
317                         tdb_transaction_cancel(tdb);
318                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
319                                            tdb_name(tdb), ctdb_db->db_name,
320                                            ret, tdb_errorstr(tdb)));
321                         talloc_free(new_reason);
322                         return -1;
323                 }
324                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
325                                    ctdb_db->db_name));
326         }
327
328         ret = tdb_transaction_commit(tdb);
329         if (ret != TDB_SUCCESS) {
330                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
331                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
332                 talloc_free(new_reason);
333                 return -1;
334         }
335
336         talloc_free(old_reason);
337         ctdb_db->unhealthy_reason = new_reason;
338
339         return 0;
340 }
341
342 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
343                                      struct ctdb_db_context *ctdb_db)
344 {
345         time_t now = time(NULL);
346         char *new_path;
347         char *new_reason;
348         int ret;
349         struct tm *tm;
350
351         tm = gmtime(&now);
352
353         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
354         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
355                                    "%04u%02u%02u%02u%02u%02u.0Z",
356                                    ctdb_db->db_path,
357                                    tm->tm_year+1900, tm->tm_mon+1,
358                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
359                                    tm->tm_sec);
360         if (new_path == NULL) {
361                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
362                 return -1;
363         }
364
365         new_reason = talloc_asprintf(ctdb_db,
366                                      "ERROR - Backup of corrupted TDB in '%s'",
367                                      new_path);
368         if (new_reason == NULL) {
369                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
370                 return -1;
371         }
372         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
373         talloc_free(new_reason);
374         if (ret != 0) {
375                 DEBUG(DEBUG_CRIT,(__location__
376                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
377                                  ctdb_db->db_path));
378                 return -1;
379         }
380
381         ret = rename(ctdb_db->db_path, new_path);
382         if (ret != 0) {
383                 DEBUG(DEBUG_CRIT,(__location__
384                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
385                                   ctdb_db->db_path, new_path,
386                                   errno, strerror(errno)));
387                 talloc_free(new_path);
388                 return -1;
389         }
390
391         DEBUG(DEBUG_CRIT,(__location__
392                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
393                          ctdb_db->db_path, new_path));
394         talloc_free(new_path);
395         return 0;
396 }
397
398 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
399 {
400         struct ctdb_db_context *ctdb_db;
401         int ret;
402         int ok = 0;
403         int fail = 0;
404
405         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
406                 if (!ctdb_db->persistent) {
407                         continue;
408                 }
409
410                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
411                 if (ret != 0) {
412                         DEBUG(DEBUG_ALERT,(__location__
413                                            " load persistent health for '%s' failed\n",
414                                            ctdb_db->db_path));
415                         return -1;
416                 }
417
418                 if (ctdb_db->unhealthy_reason == NULL) {
419                         ok++;
420                         DEBUG(DEBUG_INFO,(__location__
421                                    " persistent db '%s' healthy\n",
422                                    ctdb_db->db_path));
423                         continue;
424                 }
425
426                 fail++;
427                 DEBUG(DEBUG_ALERT,(__location__
428                                    " persistent db '%s' unhealthy: %s\n",
429                                    ctdb_db->db_path,
430                                    ctdb_db->unhealthy_reason));
431         }
432         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
433               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
434                ok, fail));
435
436         if (fail != 0) {
437                 return -1;
438         }
439
440         return 0;
441 }
442
443
444 /*
445   mark a database - as healthy
446  */
447 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
448 {
449         uint32_t db_id = *(uint32_t *)indata.dptr;
450         struct ctdb_db_context *ctdb_db;
451         int ret;
452         bool may_recover = false;
453
454         ctdb_db = find_ctdb_db(ctdb, db_id);
455         if (!ctdb_db) {
456                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
457                 return -1;
458         }
459
460         if (ctdb_db->unhealthy_reason) {
461                 may_recover = true;
462         }
463
464         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
465         if (ret != 0) {
466                 DEBUG(DEBUG_ERR,(__location__
467                                  " ctdb_update_persistent_health(%s) failed\n",
468                                  ctdb_db->db_name));
469                 return -1;
470         }
471
472         if (may_recover && !ctdb->done_startup) {
473                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
474                                   ctdb_db->db_name));
475                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
476         }
477
478         return 0;
479 }
480
481 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
482                                    TDB_DATA indata,
483                                    TDB_DATA *outdata)
484 {
485         uint32_t db_id = *(uint32_t *)indata.dptr;
486         struct ctdb_db_context *ctdb_db;
487         int ret;
488
489         ctdb_db = find_ctdb_db(ctdb, db_id);
490         if (!ctdb_db) {
491                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
492                 return -1;
493         }
494
495         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
496         if (ret != 0) {
497                 DEBUG(DEBUG_ERR,(__location__
498                                  " ctdb_load_persistent_health(%s) failed\n",
499                                  ctdb_db->db_name));
500                 return -1;
501         }
502
503         *outdata = tdb_null;
504         if (ctdb_db->unhealthy_reason) {
505                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
506                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
507         }
508
509         return 0;
510 }
511
512 /*
513   attach to a database, handling both persistent and non-persistent databases
514   return 0 on success, -1 on failure
515  */
516 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
517                              bool persistent, const char *unhealthy_reason)
518 {
519         struct ctdb_db_context *ctdb_db, *tmp_db;
520         int ret;
521         struct TDB_DATA key;
522         unsigned tdb_flags;
523         int mode = 0600;
524         int remaining_tries = 0;
525
526         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
527         CTDB_NO_MEMORY(ctdb, ctdb_db);
528
529         ctdb_db->priority = 1;
530         ctdb_db->ctdb = ctdb;
531         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
532         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
533
534         key.dsize = strlen(db_name)+1;
535         key.dptr  = discard_const(db_name);
536         ctdb_db->db_id = ctdb_hash(&key);
537         ctdb_db->persistent = persistent;
538
539         /* check for hash collisions */
540         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
541                 if (tmp_db->db_id == ctdb_db->db_id) {
542                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
543                                  tmp_db->db_id, db_name, tmp_db->db_name));
544                         talloc_free(ctdb_db);
545                         return -1;
546                 }
547         }
548
549         if (persistent) {
550                 if (unhealthy_reason) {
551                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
552                                                             unhealthy_reason, 0);
553                         if (ret != 0) {
554                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
555                                                    ctdb_db->db_name, unhealthy_reason, ret));
556                                 talloc_free(ctdb_db);
557                                 return -1;
558                         }
559                 }
560
561                 if (ctdb->max_persistent_check_errors > 0) {
562                         remaining_tries = 1;
563                 }
564                 if (ctdb->done_startup) {
565                         remaining_tries = 0;
566                 }
567
568                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
569                 if (ret != 0) {
570                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
571                                    ctdb_db->db_name, ret));
572                         talloc_free(ctdb_db);
573                         return -1;
574                 }
575         }
576
577         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
578                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
579                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
580                 talloc_free(ctdb_db);
581                 return -1;
582         }
583
584         if (ctdb_db->unhealthy_reason) {
585                 /* this is just a warning, but we want that in the log file! */
586                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
587                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
588         }
589
590         /* open the database */
591         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
592                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
593                                            db_name, ctdb->pnn);
594
595         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
596         if (ctdb->valgrinding) {
597                 tdb_flags |= TDB_NOMMAP;
598         }
599         tdb_flags |= TDB_DISALLOW_NESTING;
600
601 again:
602         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
603                                       ctdb->tunable.database_hash_size, 
604                                       tdb_flags, 
605                                       O_CREAT|O_RDWR, mode);
606         if (ctdb_db->ltdb == NULL) {
607                 struct stat st;
608                 int saved_errno = errno;
609
610                 if (!persistent) {
611                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
612                                           ctdb_db->db_path,
613                                           saved_errno,
614                                           strerror(saved_errno)));
615                         talloc_free(ctdb_db);
616                         return -1;
617                 }
618
619                 if (remaining_tries == 0) {
620                         DEBUG(DEBUG_CRIT,(__location__
621                                           "Failed to open persistent tdb '%s': %d - %s\n",
622                                           ctdb_db->db_path,
623                                           saved_errno,
624                                           strerror(saved_errno)));
625                         talloc_free(ctdb_db);
626                         return -1;
627                 }
628
629                 ret = stat(ctdb_db->db_path, &st);
630                 if (ret != 0) {
631                         DEBUG(DEBUG_CRIT,(__location__
632                                           "Failed to open persistent tdb '%s': %d - %s\n",
633                                           ctdb_db->db_path,
634                                           saved_errno,
635                                           strerror(saved_errno)));
636                         talloc_free(ctdb_db);
637                         return -1;
638                 }
639
640                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
641                 if (ret != 0) {
642                         DEBUG(DEBUG_CRIT,(__location__
643                                           "Failed to open persistent tdb '%s': %d - %s\n",
644                                           ctdb_db->db_path,
645                                           saved_errno,
646                                           strerror(saved_errno)));
647                         talloc_free(ctdb_db);
648                         return -1;
649                 }
650
651                 remaining_tries--;
652                 mode = st.st_mode;
653                 goto again;
654         }
655
656         if (!persistent) {
657                 ctdb_check_db_empty(ctdb_db);
658         } else {
659                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
660                 if (ret != 0) {
661                         int fd;
662                         struct stat st;
663
664                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
665                                           ctdb_db->db_path, ret,
666                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
667                         if (remaining_tries == 0) {
668                                 talloc_free(ctdb_db);
669                                 return -1;
670                         }
671
672                         fd = tdb_fd(ctdb_db->ltdb->tdb);
673                         ret = fstat(fd, &st);
674                         if (ret != 0) {
675                                 DEBUG(DEBUG_CRIT,(__location__
676                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
677                                                   ctdb_db->db_path,
678                                                   errno,
679                                                   strerror(errno)));
680                                 talloc_free(ctdb_db);
681                                 return -1;
682                         }
683
684                         /* close the TDB */
685                         talloc_free(ctdb_db->ltdb);
686                         ctdb_db->ltdb = NULL;
687
688                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
689                         if (ret != 0) {
690                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
691                                                   ctdb_db->db_path));
692                                 talloc_free(ctdb_db);
693                                 return -1;
694                         }
695
696                         remaining_tries--;
697                         mode = st.st_mode;
698                         goto again;
699                 }
700         }
701
702         DLIST_ADD(ctdb->db_list, ctdb_db);
703
704         /* setting this can help some high churn databases */
705         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
706
707         /* 
708            all databases support the "null" function. we need this in
709            order to do forced migration of records
710         */
711         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
712         if (ret != 0) {
713                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
714                 talloc_free(ctdb_db);
715                 return -1;
716         }
717
718         /* 
719            all databases support the "fetch" function. we need this
720            for efficient Samba3 ctdb fetch
721         */
722         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
723         if (ret != 0) {
724                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
725                 talloc_free(ctdb_db);
726                 return -1;
727         }
728
729         ret = ctdb_vacuum_init(ctdb_db);
730         if (ret != 0) {
731                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
732                                   "database '%s'\n", ctdb_db->db_name));
733                 talloc_free(ctdb_db);
734                 return -1;
735         }
736
737
738         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
739         
740         /* success */
741         return 0;
742 }
743
744
745 /*
746   a client has asked to attach a new database
747  */
748 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
749                                TDB_DATA *outdata, uint64_t tdb_flags, 
750                                bool persistent)
751 {
752         const char *db_name = (const char *)indata.dptr;
753         struct ctdb_db_context *db;
754         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
755
756         /* the client can optionally pass additional tdb flags, but we
757            only allow a subset of those on the database in ctdb. Note
758            that tdb_flags is passed in via the (otherwise unused)
759            srvid to the attach control */
760         tdb_flags &= TDB_NOSYNC;
761
762         /* If the node is inactive it is not part of the cluster
763            and we should not allow clients to attach to any
764            databases
765         */
766         if (node->flags & NODE_FLAGS_INACTIVE) {
767                 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
768                 return -1;
769         }
770
771
772         /* see if we already have this name */
773         db = ctdb_db_handle(ctdb, db_name);
774         if (db) {
775                 outdata->dptr  = (uint8_t *)&db->db_id;
776                 outdata->dsize = sizeof(db->db_id);
777                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
778                 return 0;
779         }
780
781         if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
782                 return -1;
783         }
784
785         db = ctdb_db_handle(ctdb, db_name);
786         if (!db) {
787                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
788                 return -1;
789         }
790
791         /* remember the flags the client has specified */
792         tdb_add_flags(db->ltdb->tdb, tdb_flags);
793
794         outdata->dptr  = (uint8_t *)&db->db_id;
795         outdata->dsize = sizeof(db->db_id);
796
797         /* Try to ensure it's locked in mem */
798         ctdb_lockdown_memory(ctdb);
799
800         /* tell all the other nodes about this database */
801         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
802                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
803                                                 CTDB_CONTROL_DB_ATTACH,
804                                  0, CTDB_CTRL_FLAG_NOREPLY,
805                                  indata, NULL, NULL);
806
807         /* success */
808         return 0;
809 }
810
811
812 /*
813   attach to all existing persistent databases
814  */
815 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
816                                   const char *unhealthy_reason)
817 {
818         DIR *d;
819         struct dirent *de;
820
821         /* open the persistent db directory and scan it for files */
822         d = opendir(ctdb->db_directory_persistent);
823         if (d == NULL) {
824                 return 0;
825         }
826
827         while ((de=readdir(d))) {
828                 char *p, *s, *q;
829                 size_t len = strlen(de->d_name);
830                 uint32_t node;
831                 int invalid_name = 0;
832                 
833                 s = talloc_strdup(ctdb, de->d_name);
834                 CTDB_NO_MEMORY(ctdb, s);
835
836                 /* only accept names ending in .tdb */
837                 p = strstr(s, ".tdb.");
838                 if (len < 7 || p == NULL) {
839                         talloc_free(s);
840                         continue;
841                 }
842
843                 /* only accept names ending with .tdb. and any number of digits */
844                 q = p+5;
845                 while (*q != 0 && invalid_name == 0) {
846                         if (!isdigit(*q++)) {
847                                 invalid_name = 1;
848                         }
849                 }
850                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
851                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
852                         talloc_free(s);
853                         continue;
854                 }
855                 p[4] = 0;
856
857                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
858                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
859                         closedir(d);
860                         talloc_free(s);
861                         return -1;
862                 }
863
864                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
865
866                 talloc_free(s);
867         }
868         closedir(d);
869         return 0;
870 }
871
872 int ctdb_attach_databases(struct ctdb_context *ctdb)
873 {
874         int ret;
875         char *persistent_health_path = NULL;
876         char *unhealthy_reason = NULL;
877         bool first_try = true;
878
879         if (ctdb->db_directory == NULL) {
880                 ctdb->db_directory = VARDIR "/ctdb";
881         }
882         if (ctdb->db_directory_persistent == NULL) {
883                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
884         }
885         if (ctdb->db_directory_state == NULL) {
886                 ctdb->db_directory_state = VARDIR "/ctdb/state";
887         }
888
889         /* make sure the db directory exists */
890         ret = mkdir(ctdb->db_directory, 0700);
891         if (ret == -1 && errno != EEXIST) {
892                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
893                          ctdb->db_directory));
894                 return -1;
895         }
896
897         /* make sure the persistent db directory exists */
898         ret = mkdir(ctdb->db_directory_persistent, 0700);
899         if (ret == -1 && errno != EEXIST) {
900                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
901                          ctdb->db_directory_persistent));
902                 return -1;
903         }
904
905         /* make sure the internal state db directory exists */
906         ret = mkdir(ctdb->db_directory_state, 0700);
907         if (ret == -1 && errno != EEXIST) {
908                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
909                          ctdb->db_directory_state));
910                 return -1;
911         }
912
913         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
914                                                  ctdb->db_directory_state,
915                                                  PERSISTENT_HEALTH_TDB,
916                                                  ctdb->pnn);
917         if (persistent_health_path == NULL) {
918                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
919                 return -1;
920         }
921
922 again:
923
924         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
925                                                    0, TDB_DISALLOW_NESTING,
926                                                    O_CREAT | O_RDWR, 0600);
927         if (ctdb->db_persistent_health == NULL) {
928                 struct tdb_wrap *tdb;
929
930                 if (!first_try) {
931                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
932                                           persistent_health_path,
933                                           errno,
934                                           strerror(errno)));
935                         talloc_free(persistent_health_path);
936                         talloc_free(unhealthy_reason);
937                         return -1;
938                 }
939                 first_try = false;
940
941                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
942                                                    persistent_health_path,
943                                                    "was cleared after a failure",
944                                                    "manual verification needed");
945                 if (unhealthy_reason == NULL) {
946                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
947                         talloc_free(persistent_health_path);
948                         return -1;
949                 }
950
951                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
952                                   persistent_health_path));
953                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
954                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
955                                     O_CREAT | O_RDWR, 0600);
956                 if (tdb) {
957                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
958                                           persistent_health_path,
959                                           errno,
960                                           strerror(errno)));
961                         talloc_free(persistent_health_path);
962                         talloc_free(unhealthy_reason);
963                         return -1;
964                 }
965
966                 talloc_free(tdb);
967                 goto again;
968         }
969         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
970         if (ret != 0) {
971                 struct tdb_wrap *tdb;
972
973                 talloc_free(ctdb->db_persistent_health);
974                 ctdb->db_persistent_health = NULL;
975
976                 if (!first_try) {
977                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
978                                           persistent_health_path));
979                         talloc_free(persistent_health_path);
980                         talloc_free(unhealthy_reason);
981                         return -1;
982                 }
983                 first_try = false;
984
985                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
986                                                    persistent_health_path,
987                                                    "was cleared after a failure",
988                                                    "manual verification needed");
989                 if (unhealthy_reason == NULL) {
990                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
991                         talloc_free(persistent_health_path);
992                         return -1;
993                 }
994
995                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
996                                   persistent_health_path));
997                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
998                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
999                                     O_CREAT | O_RDWR, 0600);
1000                 if (tdb) {
1001                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1002                                           persistent_health_path,
1003                                           errno,
1004                                           strerror(errno)));
1005                         talloc_free(persistent_health_path);
1006                         talloc_free(unhealthy_reason);
1007                         return -1;
1008                 }
1009
1010                 talloc_free(tdb);
1011                 goto again;
1012         }
1013         talloc_free(persistent_health_path);
1014
1015         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1016         talloc_free(unhealthy_reason);
1017         if (ret != 0) {
1018                 return ret;
1019         }
1020
1021         return 0;
1022 }
1023
1024 /*
1025   called when a broadcast seqnum update comes in
1026  */
1027 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1028 {
1029         struct ctdb_db_context *ctdb_db;
1030         if (srcnode == ctdb->pnn) {
1031                 /* don't update ourselves! */
1032                 return 0;
1033         }
1034
1035         ctdb_db = find_ctdb_db(ctdb, db_id);
1036         if (!ctdb_db) {
1037                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1038                 return -1;
1039         }
1040
1041         if (ctdb_db->unhealthy_reason) {
1042                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1043                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1044                 return -1;
1045         }
1046
1047         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1048         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1049         return 0;
1050 }
1051
1052 /*
1053   timer to check for seqnum changes in a ltdb and propogate them
1054  */
1055 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1056                                    struct timeval t, void *p)
1057 {
1058         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1059         struct ctdb_context *ctdb = ctdb_db->ctdb;
1060         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1061         if (new_seqnum != ctdb_db->seqnum) {
1062                 /* something has changed - propogate it */
1063                 TDB_DATA data;
1064                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1065                 data.dsize = sizeof(uint32_t);
1066                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1067                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1068                                          data, NULL, NULL);             
1069         }
1070         ctdb_db->seqnum = new_seqnum;
1071
1072         /* setup a new timer */
1073         ctdb_db->seqnum_update =
1074                 event_add_timed(ctdb->ev, ctdb_db, 
1075                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1076                                 ctdb_ltdb_seqnum_check, ctdb_db);
1077 }
1078
1079 /*
1080   enable seqnum handling on this db
1081  */
1082 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1083 {
1084         struct ctdb_db_context *ctdb_db;
1085         ctdb_db = find_ctdb_db(ctdb, db_id);
1086         if (!ctdb_db) {
1087                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1088                 return -1;
1089         }
1090
1091         if (ctdb_db->seqnum_update == NULL) {
1092                 ctdb_db->seqnum_update =
1093                         event_add_timed(ctdb->ev, ctdb_db, 
1094                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1095                                         ctdb_ltdb_seqnum_check, ctdb_db);
1096         }
1097
1098         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1099         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1100         return 0;
1101 }
1102
1103 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1104 {
1105         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1106         struct ctdb_db_context *ctdb_db;
1107
1108         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1109         if (!ctdb_db) {
1110                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1111                 return -1;
1112         }
1113
1114         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1115                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1116                 return -1;
1117         }
1118
1119         ctdb_db->priority = db_prio->priority;
1120         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1121
1122         return 0;
1123 }
1124
1125