Dont return error if trying to set db priority on a db that does not yet exist.
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "db_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /*
35   this is the dummy null procedure that all databases support
36 */
37 static int ctdb_null_func(struct ctdb_call_info *call)
38 {
39         return 0;
40 }
41
42 /*
43   this is a plain fetch procedure that all databases support
44 */
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
46 {
47         call->reply_data = &call->record_data;
48         return 0;
49 }
50
51
52
53 struct lock_fetch_state {
54         struct ctdb_context *ctdb;
55         void (*recv_pkt)(void *, struct ctdb_req_header *);
56         void *recv_context;
57         struct ctdb_req_header *hdr;
58         uint32_t generation;
59         bool ignore_generation;
60 };
61
62 /*
63   called when we should retry the operation
64  */
65 static void lock_fetch_callback(void *p)
66 {
67         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68         if (!state->ignore_generation &&
69             state->generation != state->ctdb->vnn_map->generation) {
70                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71                 talloc_free(state->hdr);
72                 return;
73         }
74         state->recv_pkt(state->recv_context, state->hdr);
75         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
76 }
77
78
79 /*
80   do a non-blocking ltdb_lock, deferring this ctdb request until we
81   have the chainlock
82
83   It does the following:
84
85    1) tries to get the chainlock. If it succeeds, then it returns 0
86
87    2) if it fails to get a chainlock immediately then it sets up a
88    non-blocking chainlock via ctdb_lockwait, and when it gets the
89    chainlock it re-submits this ctdb request to the main packet
90    receive function
91
92    This effectively queues all ctdb requests that cannot be
93    immediately satisfied until it can get the lock. This means that
94    the main ctdb daemon will not block waiting for a chainlock held by
95    a client
96
97    There are 3 possible return values:
98
99        0:    means that it got the lock immediately.
100       -1:    means that it failed to get the lock, and won't retry
101       -2:    means that it failed to get the lock immediately, but will retry
102  */
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
104                            TDB_DATA key, struct ctdb_req_header *hdr,
105                            void (*recv_pkt)(void *, struct ctdb_req_header *),
106                            void *recv_context, bool ignore_generation)
107 {
108         int ret;
109         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110         struct lockwait_handle *h;
111         struct lock_fetch_state *state;
112         
113         ret = tdb_chainlock_nonblock(tdb, key);
114
115         if (ret != 0 &&
116             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117                 /* a hard failure - don't try again */
118                 return -1;
119         }
120
121         /* when torturing, ensure we test the contended path */
122         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
123             random() % 5 == 0) {
124                 ret = -1;
125                 tdb_chainunlock(tdb, key);
126         }
127
128         /* first the non-contended path */
129         if (ret == 0) {
130                 return 0;
131         }
132
133         state = talloc(hdr, struct lock_fetch_state);
134         state->ctdb = ctdb_db->ctdb;
135         state->hdr = hdr;
136         state->recv_pkt = recv_pkt;
137         state->recv_context = recv_context;
138         state->generation = ctdb_db->ctdb->vnn_map->generation;
139         state->ignore_generation = ignore_generation;
140
141         /* now the contended path */
142         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
143         if (h == NULL) {
144                 return -1;
145         }
146
147         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
148            so it won't be freed yet */
149         talloc_steal(state, hdr);
150         talloc_steal(state, h);
151
152         /* now tell the caller than we will retry asynchronously */
153         return -2;
154 }
155
156 /*
157   a varient of ctdb_ltdb_lock_requeue that also fetches the record
158  */
159 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
160                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
161                                  struct ctdb_req_header *hdr, TDB_DATA *data,
162                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
163                                  void *recv_context, bool ignore_generation)
164 {
165         int ret;
166
167         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
168                                      recv_context, ignore_generation);
169         if (ret == 0) {
170                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
171                 if (ret != 0) {
172                         int uret;
173                         uret = ctdb_ltdb_unlock(ctdb_db, key);
174                         if (uret != 0) {
175                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
176                         }
177                 }
178         }
179         return ret;
180 }
181
182
183 /*
184   paraoid check to see if the db is empty
185  */
186 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
187 {
188         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
189         int count = tdb_traverse_read(tdb, NULL, NULL);
190         if (count != 0) {
191                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
192                          ctdb_db->db_path));
193                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
194         }
195 }
196
197 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
198                                 struct ctdb_db_context *ctdb_db)
199 {
200         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
201         char *old;
202         char *reason = NULL;
203         TDB_DATA key;
204         TDB_DATA val;
205
206         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
207         key.dsize = strlen(ctdb_db->db_name);
208
209         old = ctdb_db->unhealthy_reason;
210         ctdb_db->unhealthy_reason = NULL;
211
212         val = tdb_fetch(tdb, key);
213         if (val.dsize > 0) {
214                 reason = talloc_strndup(ctdb_db,
215                                         (const char *)val.dptr,
216                                         val.dsize);
217                 if (reason == NULL) {
218                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
219                                            (int)val.dsize));
220                         ctdb_db->unhealthy_reason = old;
221                         free(val.dptr);
222                         return -1;
223                 }
224         }
225
226         if (val.dptr) {
227                 free(val.dptr);
228         }
229
230         talloc_free(old);
231         ctdb_db->unhealthy_reason = reason;
232         return 0;
233 }
234
235 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
236                                   struct ctdb_db_context *ctdb_db,
237                                   const char *given_reason,/* NULL means healthy */
238                                   int num_healthy_nodes)
239 {
240         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
241         int ret;
242         TDB_DATA key;
243         TDB_DATA val;
244         char *new_reason = NULL;
245         char *old_reason = NULL;
246
247         ret = tdb_transaction_start(tdb);
248         if (ret != 0) {
249                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
250                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
251                 return -1;
252         }
253
254         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
255         if (ret != 0) {
256                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
257                                    ctdb_db->db_name, ret));
258                 return -1;
259         }
260         old_reason = ctdb_db->unhealthy_reason;
261
262         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
263         key.dsize = strlen(ctdb_db->db_name);
264
265         if (given_reason) {
266                 new_reason = talloc_strdup(ctdb_db, given_reason);
267                 if (new_reason == NULL) {
268                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
269                                           given_reason));
270                         return -1;
271                 }
272         } else if (old_reason && num_healthy_nodes == 0) {
273                 /*
274                  * If the reason indicates ok, but there where no healthy nodes
275                  * available, that it means, we have not recovered valid content
276                  * of the db. So if there's an old reason, prefix it with
277                  * "NO-HEALTHY-NODES - "
278                  */
279                 const char *prefix;
280
281 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
282                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
283                 if (ret != 0) {
284                         prefix = _TMP_PREFIX;
285                 } else {
286                         prefix = "";
287                 }
288                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
289                                          prefix, old_reason);
290                 if (new_reason == NULL) {
291                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
292                                           prefix, old_reason));
293                         return -1;
294                 }
295 #undef _TMP_PREFIX
296         }
297
298         if (new_reason) {
299                 val.dptr = discard_const_p(uint8_t, new_reason);
300                 val.dsize = strlen(new_reason);
301
302                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
303                 if (ret != 0) {
304                         tdb_transaction_cancel(tdb);
305                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
306                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
307                                            ret, tdb_errorstr(tdb)));
308                         talloc_free(new_reason);
309                         return -1;
310                 }
311                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
312                                    ctdb_db->db_name, new_reason));
313         } else if (old_reason) {
314                 ret = tdb_delete(tdb, key);
315                 if (ret != 0) {
316                         tdb_transaction_cancel(tdb);
317                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
318                                            tdb_name(tdb), ctdb_db->db_name,
319                                            ret, tdb_errorstr(tdb)));
320                         talloc_free(new_reason);
321                         return -1;
322                 }
323                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
324                                    ctdb_db->db_name));
325         }
326
327         ret = tdb_transaction_commit(tdb);
328         if (ret != TDB_SUCCESS) {
329                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
330                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
331                 talloc_free(new_reason);
332                 return -1;
333         }
334
335         talloc_free(old_reason);
336         ctdb_db->unhealthy_reason = new_reason;
337
338         return 0;
339 }
340
341 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
342                                      struct ctdb_db_context *ctdb_db)
343 {
344         time_t now = time(NULL);
345         char *new_path;
346         char *new_reason;
347         int ret;
348         struct tm *tm;
349
350         tm = gmtime(&now);
351
352         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
353         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
354                                    "%04u%02u%02u%02u%02u%02u.0Z",
355                                    ctdb_db->db_path,
356                                    tm->tm_year+1900, tm->tm_mon+1,
357                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
358                                    tm->tm_sec);
359         if (new_path == NULL) {
360                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
361                 return -1;
362         }
363
364         new_reason = talloc_asprintf(ctdb_db,
365                                      "ERROR - Backup of corrupted TDB in '%s'",
366                                      new_path);
367         if (new_reason == NULL) {
368                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
369                 return -1;
370         }
371         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
372         talloc_free(new_reason);
373         if (ret != 0) {
374                 DEBUG(DEBUG_CRIT,(__location__
375                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
376                                  ctdb_db->db_path));
377                 return -1;
378         }
379
380         ret = rename(ctdb_db->db_path, new_path);
381         if (ret != 0) {
382                 DEBUG(DEBUG_CRIT,(__location__
383                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
384                                   ctdb_db->db_path, new_path,
385                                   errno, strerror(errno)));
386                 talloc_free(new_path);
387                 return -1;
388         }
389
390         DEBUG(DEBUG_CRIT,(__location__
391                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
392                          ctdb_db->db_path, new_path));
393         talloc_free(new_path);
394         return 0;
395 }
396
397 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
398 {
399         struct ctdb_db_context *ctdb_db;
400         int ret;
401         int ok = 0;
402         int fail = 0;
403
404         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
405                 if (!ctdb_db->persistent) {
406                         continue;
407                 }
408
409                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
410                 if (ret != 0) {
411                         DEBUG(DEBUG_ALERT,(__location__
412                                            " load persistent health for '%s' failed\n",
413                                            ctdb_db->db_path));
414                         return -1;
415                 }
416
417                 if (ctdb_db->unhealthy_reason == NULL) {
418                         ok++;
419                         DEBUG(DEBUG_INFO,(__location__
420                                    " persistent db '%s' healthy\n",
421                                    ctdb_db->db_path));
422                         continue;
423                 }
424
425                 fail++;
426                 DEBUG(DEBUG_ALERT,(__location__
427                                    " persistent db '%s' unhealthy: %s\n",
428                                    ctdb_db->db_path,
429                                    ctdb_db->unhealthy_reason));
430         }
431         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
432               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
433                ok, fail));
434
435         if (fail != 0) {
436                 return -1;
437         }
438
439         return 0;
440 }
441
442
443 /*
444   mark a database - as healthy
445  */
446 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
447 {
448         uint32_t db_id = *(uint32_t *)indata.dptr;
449         struct ctdb_db_context *ctdb_db;
450         int ret;
451         bool may_recover = false;
452
453         ctdb_db = find_ctdb_db(ctdb, db_id);
454         if (!ctdb_db) {
455                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
456                 return -1;
457         }
458
459         if (ctdb_db->unhealthy_reason) {
460                 may_recover = true;
461         }
462
463         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
464         if (ret != 0) {
465                 DEBUG(DEBUG_ERR,(__location__
466                                  " ctdb_update_persistent_health(%s) failed\n",
467                                  ctdb_db->db_name));
468                 return -1;
469         }
470
471         if (may_recover && !ctdb->done_startup) {
472                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
473                                   ctdb_db->db_name));
474                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
475         }
476
477         return 0;
478 }
479
480 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
481                                    TDB_DATA indata,
482                                    TDB_DATA *outdata)
483 {
484         uint32_t db_id = *(uint32_t *)indata.dptr;
485         struct ctdb_db_context *ctdb_db;
486         int ret;
487
488         ctdb_db = find_ctdb_db(ctdb, db_id);
489         if (!ctdb_db) {
490                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
491                 return -1;
492         }
493
494         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
495         if (ret != 0) {
496                 DEBUG(DEBUG_ERR,(__location__
497                                  " ctdb_load_persistent_health(%s) failed\n",
498                                  ctdb_db->db_name));
499                 return -1;
500         }
501
502         *outdata = tdb_null;
503         if (ctdb_db->unhealthy_reason) {
504                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
505                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
506         }
507
508         return 0;
509 }
510
511 /*
512   attach to a database, handling both persistent and non-persistent databases
513   return 0 on success, -1 on failure
514  */
515 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
516                              bool persistent, const char *unhealthy_reason,
517                              bool jenkinshash)
518 {
519         struct ctdb_db_context *ctdb_db, *tmp_db;
520         int ret;
521         struct TDB_DATA key;
522         unsigned tdb_flags;
523         int mode = 0600;
524         int remaining_tries = 0;
525
526         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
527         CTDB_NO_MEMORY(ctdb, ctdb_db);
528
529         ctdb_db->priority = 1;
530         ctdb_db->ctdb = ctdb;
531         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
532         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
533
534         key.dsize = strlen(db_name)+1;
535         key.dptr  = discard_const(db_name);
536         ctdb_db->db_id = ctdb_hash(&key);
537         ctdb_db->persistent = persistent;
538
539         /* check for hash collisions */
540         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
541                 if (tmp_db->db_id == ctdb_db->db_id) {
542                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
543                                  tmp_db->db_id, db_name, tmp_db->db_name));
544                         talloc_free(ctdb_db);
545                         return -1;
546                 }
547         }
548
549         if (persistent) {
550                 if (unhealthy_reason) {
551                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
552                                                             unhealthy_reason, 0);
553                         if (ret != 0) {
554                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
555                                                    ctdb_db->db_name, unhealthy_reason, ret));
556                                 talloc_free(ctdb_db);
557                                 return -1;
558                         }
559                 }
560
561                 if (ctdb->max_persistent_check_errors > 0) {
562                         remaining_tries = 1;
563                 }
564                 if (ctdb->done_startup) {
565                         remaining_tries = 0;
566                 }
567
568                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
569                 if (ret != 0) {
570                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
571                                    ctdb_db->db_name, ret));
572                         talloc_free(ctdb_db);
573                         return -1;
574                 }
575         }
576
577         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
578                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
579                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
580                 talloc_free(ctdb_db);
581                 return -1;
582         }
583
584         if (ctdb_db->unhealthy_reason) {
585                 /* this is just a warning, but we want that in the log file! */
586                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
587                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
588         }
589
590         /* open the database */
591         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
592                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
593                                            db_name, ctdb->pnn);
594
595         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
596         if (ctdb->valgrinding) {
597                 tdb_flags |= TDB_NOMMAP;
598         }
599         tdb_flags |= TDB_DISALLOW_NESTING;
600         if (jenkinshash) {
601                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
602         }
603
604 again:
605         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
606                                       ctdb->tunable.database_hash_size, 
607                                       tdb_flags, 
608                                       O_CREAT|O_RDWR, mode);
609         if (ctdb_db->ltdb == NULL) {
610                 struct stat st;
611                 int saved_errno = errno;
612
613                 if (!persistent) {
614                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
615                                           ctdb_db->db_path,
616                                           saved_errno,
617                                           strerror(saved_errno)));
618                         talloc_free(ctdb_db);
619                         return -1;
620                 }
621
622                 if (remaining_tries == 0) {
623                         DEBUG(DEBUG_CRIT,(__location__
624                                           "Failed to open persistent tdb '%s': %d - %s\n",
625                                           ctdb_db->db_path,
626                                           saved_errno,
627                                           strerror(saved_errno)));
628                         talloc_free(ctdb_db);
629                         return -1;
630                 }
631
632                 ret = stat(ctdb_db->db_path, &st);
633                 if (ret != 0) {
634                         DEBUG(DEBUG_CRIT,(__location__
635                                           "Failed to open persistent tdb '%s': %d - %s\n",
636                                           ctdb_db->db_path,
637                                           saved_errno,
638                                           strerror(saved_errno)));
639                         talloc_free(ctdb_db);
640                         return -1;
641                 }
642
643                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
644                 if (ret != 0) {
645                         DEBUG(DEBUG_CRIT,(__location__
646                                           "Failed to open persistent tdb '%s': %d - %s\n",
647                                           ctdb_db->db_path,
648                                           saved_errno,
649                                           strerror(saved_errno)));
650                         talloc_free(ctdb_db);
651                         return -1;
652                 }
653
654                 remaining_tries--;
655                 mode = st.st_mode;
656                 goto again;
657         }
658
659         if (!persistent) {
660                 ctdb_check_db_empty(ctdb_db);
661         } else {
662                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
663                 if (ret != 0) {
664                         int fd;
665                         struct stat st;
666
667                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
668                                           ctdb_db->db_path, ret,
669                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
670                         if (remaining_tries == 0) {
671                                 talloc_free(ctdb_db);
672                                 return -1;
673                         }
674
675                         fd = tdb_fd(ctdb_db->ltdb->tdb);
676                         ret = fstat(fd, &st);
677                         if (ret != 0) {
678                                 DEBUG(DEBUG_CRIT,(__location__
679                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
680                                                   ctdb_db->db_path,
681                                                   errno,
682                                                   strerror(errno)));
683                                 talloc_free(ctdb_db);
684                                 return -1;
685                         }
686
687                         /* close the TDB */
688                         talloc_free(ctdb_db->ltdb);
689                         ctdb_db->ltdb = NULL;
690
691                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
692                         if (ret != 0) {
693                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
694                                                   ctdb_db->db_path));
695                                 talloc_free(ctdb_db);
696                                 return -1;
697                         }
698
699                         remaining_tries--;
700                         mode = st.st_mode;
701                         goto again;
702                 }
703         }
704
705         DLIST_ADD(ctdb->db_list, ctdb_db);
706
707         /* setting this can help some high churn databases */
708         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
709
710         /* 
711            all databases support the "null" function. we need this in
712            order to do forced migration of records
713         */
714         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
715         if (ret != 0) {
716                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
717                 talloc_free(ctdb_db);
718                 return -1;
719         }
720
721         /* 
722            all databases support the "fetch" function. we need this
723            for efficient Samba3 ctdb fetch
724         */
725         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
726         if (ret != 0) {
727                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
728                 talloc_free(ctdb_db);
729                 return -1;
730         }
731
732         ret = ctdb_vacuum_init(ctdb_db);
733         if (ret != 0) {
734                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
735                                   "database '%s'\n", ctdb_db->db_name));
736                 talloc_free(ctdb_db);
737                 return -1;
738         }
739
740
741         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
742         
743         /* success */
744         return 0;
745 }
746
747
748 struct ctdb_deferred_attach_context {
749         struct ctdb_deferred_attach_context *next, *prev;
750         struct ctdb_context *ctdb;
751         struct ctdb_req_control *c;
752 };
753
754
755 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
756 {
757         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
758
759         return 0;
760 }
761
762 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
763 {
764         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
765         struct ctdb_context *ctdb = da_ctx->ctdb;
766
767         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
768         talloc_free(da_ctx);
769 }
770
771 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
772 {
773         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
774         struct ctdb_context *ctdb = da_ctx->ctdb;
775
776         /* This talloc-steals the packet ->c */
777         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
778         talloc_free(da_ctx);
779 }
780
781 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
782 {
783         struct ctdb_deferred_attach_context *da_ctx;
784
785         /* call it from the main event loop as soon as the current event 
786            finishes.
787          */
788         while ((da_ctx = ctdb->deferred_attach) != NULL) {
789                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
790                 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
791         }
792
793         return 0;
794 }
795
796 /*
797   a client has asked to attach a new database
798  */
799 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
800                                TDB_DATA *outdata, uint64_t tdb_flags, 
801                                bool persistent, uint32_t client_id,
802                                struct ctdb_req_control *c,
803                                bool *async_reply)
804 {
805         const char *db_name = (const char *)indata.dptr;
806         struct ctdb_db_context *db;
807         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
808
809         /* dont allow any local clients to attach while we are in recovery mode
810          * except for the recovery daemon.
811          * allow all attach from the network since these are always from remote
812          * recovery daemons.
813          */
814         if (client_id != 0) {
815                 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
816
817                 if (client == NULL) {
818                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused. Can not match clientid:%d to a client structure.\n", db_name, client_id));
819                         return -1;
820                 }
821
822                 /* If the node is inactive it is not part of the cluster
823                    and we should not allow clients to attach to any
824                    databases
825                 */
826                 if (node->flags & NODE_FLAGS_INACTIVE) {
827                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
828                         return -1;
829                 }
830
831                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
832                  && client->pid != ctdb->recoverd_pid) {
833                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
834
835                         if (da_ctx == NULL) {
836                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
837                                 return -1;
838                         }
839
840                         da_ctx->ctdb = ctdb;
841                         da_ctx->c = talloc_steal(da_ctx, c);
842                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
843                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
844
845                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
846
847                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
848                         *async_reply = true;
849                         return 0;
850                 }
851         }
852
853         /* the client can optionally pass additional tdb flags, but we
854            only allow a subset of those on the database in ctdb. Note
855            that tdb_flags is passed in via the (otherwise unused)
856            srvid to the attach control */
857         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
858
859         /* see if we already have this name */
860         db = ctdb_db_handle(ctdb, db_name);
861         if (db) {
862                 outdata->dptr  = (uint8_t *)&db->db_id;
863                 outdata->dsize = sizeof(db->db_id);
864                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
865                 return 0;
866         }
867
868         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
869                 return -1;
870         }
871
872         db = ctdb_db_handle(ctdb, db_name);
873         if (!db) {
874                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
875                 return -1;
876         }
877
878         /* remember the flags the client has specified */
879         tdb_add_flags(db->ltdb->tdb, tdb_flags);
880
881         outdata->dptr  = (uint8_t *)&db->db_id;
882         outdata->dsize = sizeof(db->db_id);
883
884         /* Try to ensure it's locked in mem */
885         ctdb_lockdown_memory(ctdb);
886
887         /* tell all the other nodes about this database */
888         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
889                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
890                                                 CTDB_CONTROL_DB_ATTACH,
891                                  0, CTDB_CTRL_FLAG_NOREPLY,
892                                  indata, NULL, NULL);
893
894         /* success */
895         return 0;
896 }
897
898
899 /*
900   attach to all existing persistent databases
901  */
902 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
903                                   const char *unhealthy_reason)
904 {
905         DIR *d;
906         struct dirent *de;
907
908         /* open the persistent db directory and scan it for files */
909         d = opendir(ctdb->db_directory_persistent);
910         if (d == NULL) {
911                 return 0;
912         }
913
914         while ((de=readdir(d))) {
915                 char *p, *s, *q;
916                 size_t len = strlen(de->d_name);
917                 uint32_t node;
918                 int invalid_name = 0;
919                 
920                 s = talloc_strdup(ctdb, de->d_name);
921                 CTDB_NO_MEMORY(ctdb, s);
922
923                 /* only accept names ending in .tdb */
924                 p = strstr(s, ".tdb.");
925                 if (len < 7 || p == NULL) {
926                         talloc_free(s);
927                         continue;
928                 }
929
930                 /* only accept names ending with .tdb. and any number of digits */
931                 q = p+5;
932                 while (*q != 0 && invalid_name == 0) {
933                         if (!isdigit(*q++)) {
934                                 invalid_name = 1;
935                         }
936                 }
937                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
938                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
939                         talloc_free(s);
940                         continue;
941                 }
942                 p[4] = 0;
943
944                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
945                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
946                         closedir(d);
947                         talloc_free(s);
948                         return -1;
949                 }
950
951                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
952
953                 talloc_free(s);
954         }
955         closedir(d);
956         return 0;
957 }
958
959 int ctdb_attach_databases(struct ctdb_context *ctdb)
960 {
961         int ret;
962         char *persistent_health_path = NULL;
963         char *unhealthy_reason = NULL;
964         bool first_try = true;
965
966         if (ctdb->db_directory == NULL) {
967                 ctdb->db_directory = VARDIR "/ctdb";
968         }
969         if (ctdb->db_directory_persistent == NULL) {
970                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
971         }
972         if (ctdb->db_directory_state == NULL) {
973                 ctdb->db_directory_state = VARDIR "/ctdb/state";
974         }
975
976         /* make sure the db directory exists */
977         ret = mkdir(ctdb->db_directory, 0700);
978         if (ret == -1 && errno != EEXIST) {
979                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
980                          ctdb->db_directory));
981                 return -1;
982         }
983
984         /* make sure the persistent db directory exists */
985         ret = mkdir(ctdb->db_directory_persistent, 0700);
986         if (ret == -1 && errno != EEXIST) {
987                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
988                          ctdb->db_directory_persistent));
989                 return -1;
990         }
991
992         /* make sure the internal state db directory exists */
993         ret = mkdir(ctdb->db_directory_state, 0700);
994         if (ret == -1 && errno != EEXIST) {
995                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
996                          ctdb->db_directory_state));
997                 return -1;
998         }
999
1000         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1001                                                  ctdb->db_directory_state,
1002                                                  PERSISTENT_HEALTH_TDB,
1003                                                  ctdb->pnn);
1004         if (persistent_health_path == NULL) {
1005                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1006                 return -1;
1007         }
1008
1009 again:
1010
1011         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1012                                                    0, TDB_DISALLOW_NESTING,
1013                                                    O_CREAT | O_RDWR, 0600);
1014         if (ctdb->db_persistent_health == NULL) {
1015                 struct tdb_wrap *tdb;
1016
1017                 if (!first_try) {
1018                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1019                                           persistent_health_path,
1020                                           errno,
1021                                           strerror(errno)));
1022                         talloc_free(persistent_health_path);
1023                         talloc_free(unhealthy_reason);
1024                         return -1;
1025                 }
1026                 first_try = false;
1027
1028                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1029                                                    persistent_health_path,
1030                                                    "was cleared after a failure",
1031                                                    "manual verification needed");
1032                 if (unhealthy_reason == NULL) {
1033                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1034                         talloc_free(persistent_health_path);
1035                         return -1;
1036                 }
1037
1038                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1039                                   persistent_health_path));
1040                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1041                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1042                                     O_CREAT | O_RDWR, 0600);
1043                 if (tdb) {
1044                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1045                                           persistent_health_path,
1046                                           errno,
1047                                           strerror(errno)));
1048                         talloc_free(persistent_health_path);
1049                         talloc_free(unhealthy_reason);
1050                         return -1;
1051                 }
1052
1053                 talloc_free(tdb);
1054                 goto again;
1055         }
1056         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1057         if (ret != 0) {
1058                 struct tdb_wrap *tdb;
1059
1060                 talloc_free(ctdb->db_persistent_health);
1061                 ctdb->db_persistent_health = NULL;
1062
1063                 if (!first_try) {
1064                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1065                                           persistent_health_path));
1066                         talloc_free(persistent_health_path);
1067                         talloc_free(unhealthy_reason);
1068                         return -1;
1069                 }
1070                 first_try = false;
1071
1072                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1073                                                    persistent_health_path,
1074                                                    "was cleared after a failure",
1075                                                    "manual verification needed");
1076                 if (unhealthy_reason == NULL) {
1077                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1078                         talloc_free(persistent_health_path);
1079                         return -1;
1080                 }
1081
1082                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1083                                   persistent_health_path));
1084                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1085                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1086                                     O_CREAT | O_RDWR, 0600);
1087                 if (tdb) {
1088                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1089                                           persistent_health_path,
1090                                           errno,
1091                                           strerror(errno)));
1092                         talloc_free(persistent_health_path);
1093                         talloc_free(unhealthy_reason);
1094                         return -1;
1095                 }
1096
1097                 talloc_free(tdb);
1098                 goto again;
1099         }
1100         talloc_free(persistent_health_path);
1101
1102         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1103         talloc_free(unhealthy_reason);
1104         if (ret != 0) {
1105                 return ret;
1106         }
1107
1108         return 0;
1109 }
1110
1111 /*
1112   called when a broadcast seqnum update comes in
1113  */
1114 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1115 {
1116         struct ctdb_db_context *ctdb_db;
1117         if (srcnode == ctdb->pnn) {
1118                 /* don't update ourselves! */
1119                 return 0;
1120         }
1121
1122         ctdb_db = find_ctdb_db(ctdb, db_id);
1123         if (!ctdb_db) {
1124                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1125                 return -1;
1126         }
1127
1128         if (ctdb_db->unhealthy_reason) {
1129                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1130                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1131                 return -1;
1132         }
1133
1134         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1135         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1136         return 0;
1137 }
1138
1139 /*
1140   timer to check for seqnum changes in a ltdb and propogate them
1141  */
1142 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1143                                    struct timeval t, void *p)
1144 {
1145         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1146         struct ctdb_context *ctdb = ctdb_db->ctdb;
1147         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1148         if (new_seqnum != ctdb_db->seqnum) {
1149                 /* something has changed - propogate it */
1150                 TDB_DATA data;
1151                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1152                 data.dsize = sizeof(uint32_t);
1153                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1154                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1155                                          data, NULL, NULL);             
1156         }
1157         ctdb_db->seqnum = new_seqnum;
1158
1159         /* setup a new timer */
1160         ctdb_db->seqnum_update =
1161                 event_add_timed(ctdb->ev, ctdb_db, 
1162                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1163                                 ctdb_ltdb_seqnum_check, ctdb_db);
1164 }
1165
1166 /*
1167   enable seqnum handling on this db
1168  */
1169 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1170 {
1171         struct ctdb_db_context *ctdb_db;
1172         ctdb_db = find_ctdb_db(ctdb, db_id);
1173         if (!ctdb_db) {
1174                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1175                 return -1;
1176         }
1177
1178         if (ctdb_db->seqnum_update == NULL) {
1179                 ctdb_db->seqnum_update =
1180                         event_add_timed(ctdb->ev, ctdb_db, 
1181                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1182                                         ctdb_ltdb_seqnum_check, ctdb_db);
1183         }
1184
1185         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1186         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1187         return 0;
1188 }
1189
1190 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1191 {
1192         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1193         struct ctdb_db_context *ctdb_db;
1194
1195         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1196         if (!ctdb_db) {
1197                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1198                 return 0;
1199         }
1200
1201         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1202                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1203                 return 0;
1204         }
1205
1206         ctdb_db->priority = db_prio->priority;
1207         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1208
1209         return 0;
1210 }
1211
1212