ATTACH_DB: simplify the code slightly and change the semantics to only
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "db_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /*
35   this is the dummy null procedure that all databases support
36 */
37 static int ctdb_null_func(struct ctdb_call_info *call)
38 {
39         return 0;
40 }
41
42 /*
43   this is a plain fetch procedure that all databases support
44 */
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
46 {
47         call->reply_data = &call->record_data;
48         return 0;
49 }
50
51
52
53 struct lock_fetch_state {
54         struct ctdb_context *ctdb;
55         void (*recv_pkt)(void *, struct ctdb_req_header *);
56         void *recv_context;
57         struct ctdb_req_header *hdr;
58         uint32_t generation;
59         bool ignore_generation;
60 };
61
62 /*
63   called when we should retry the operation
64  */
65 static void lock_fetch_callback(void *p)
66 {
67         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68         if (!state->ignore_generation &&
69             state->generation != state->ctdb->vnn_map->generation) {
70                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71                 talloc_free(state->hdr);
72                 return;
73         }
74         state->recv_pkt(state->recv_context, state->hdr);
75         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
76 }
77
78
79 /*
80   do a non-blocking ltdb_lock, deferring this ctdb request until we
81   have the chainlock
82
83   It does the following:
84
85    1) tries to get the chainlock. If it succeeds, then it returns 0
86
87    2) if it fails to get a chainlock immediately then it sets up a
88    non-blocking chainlock via ctdb_lockwait, and when it gets the
89    chainlock it re-submits this ctdb request to the main packet
90    receive function
91
92    This effectively queues all ctdb requests that cannot be
93    immediately satisfied until it can get the lock. This means that
94    the main ctdb daemon will not block waiting for a chainlock held by
95    a client
96
97    There are 3 possible return values:
98
99        0:    means that it got the lock immediately.
100       -1:    means that it failed to get the lock, and won't retry
101       -2:    means that it failed to get the lock immediately, but will retry
102  */
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
104                            TDB_DATA key, struct ctdb_req_header *hdr,
105                            void (*recv_pkt)(void *, struct ctdb_req_header *),
106                            void *recv_context, bool ignore_generation)
107 {
108         int ret;
109         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110         struct lockwait_handle *h;
111         struct lock_fetch_state *state;
112         
113         ret = tdb_chainlock_nonblock(tdb, key);
114
115         if (ret != 0 &&
116             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117                 /* a hard failure - don't try again */
118                 return -1;
119         }
120
121         /* when torturing, ensure we test the contended path */
122         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
123             random() % 5 == 0) {
124                 ret = -1;
125                 tdb_chainunlock(tdb, key);
126         }
127
128         /* first the non-contended path */
129         if (ret == 0) {
130                 return 0;
131         }
132
133         state = talloc(hdr, struct lock_fetch_state);
134         state->ctdb = ctdb_db->ctdb;
135         state->hdr = hdr;
136         state->recv_pkt = recv_pkt;
137         state->recv_context = recv_context;
138         state->generation = ctdb_db->ctdb->vnn_map->generation;
139         state->ignore_generation = ignore_generation;
140
141         /* now the contended path */
142         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
143         if (h == NULL) {
144                 return -1;
145         }
146
147         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
148            so it won't be freed yet */
149         talloc_steal(state, hdr);
150         talloc_steal(state, h);
151
152         /* now tell the caller than we will retry asynchronously */
153         return -2;
154 }
155
156 /*
157   a varient of ctdb_ltdb_lock_requeue that also fetches the record
158  */
159 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
160                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
161                                  struct ctdb_req_header *hdr, TDB_DATA *data,
162                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
163                                  void *recv_context, bool ignore_generation)
164 {
165         int ret;
166
167         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
168                                      recv_context, ignore_generation);
169         if (ret == 0) {
170                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
171                 if (ret != 0) {
172                         int uret;
173                         uret = ctdb_ltdb_unlock(ctdb_db, key);
174                         if (uret != 0) {
175                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
176                         }
177                 }
178         }
179         return ret;
180 }
181
182
183 /*
184   paraoid check to see if the db is empty
185  */
186 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
187 {
188         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
189         int count = tdb_traverse_read(tdb, NULL, NULL);
190         if (count != 0) {
191                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
192                          ctdb_db->db_path));
193                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
194         }
195 }
196
197 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
198                                 struct ctdb_db_context *ctdb_db)
199 {
200         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
201         char *old;
202         char *reason = NULL;
203         TDB_DATA key;
204         TDB_DATA val;
205
206         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
207         key.dsize = strlen(ctdb_db->db_name);
208
209         old = ctdb_db->unhealthy_reason;
210         ctdb_db->unhealthy_reason = NULL;
211
212         val = tdb_fetch(tdb, key);
213         if (val.dsize > 0) {
214                 reason = talloc_strndup(ctdb_db,
215                                         (const char *)val.dptr,
216                                         val.dsize);
217                 if (reason == NULL) {
218                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
219                                            (int)val.dsize));
220                         ctdb_db->unhealthy_reason = old;
221                         free(val.dptr);
222                         return -1;
223                 }
224         }
225
226         if (val.dptr) {
227                 free(val.dptr);
228         }
229
230         talloc_free(old);
231         ctdb_db->unhealthy_reason = reason;
232         return 0;
233 }
234
235 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
236                                   struct ctdb_db_context *ctdb_db,
237                                   const char *given_reason,/* NULL means healthy */
238                                   int num_healthy_nodes)
239 {
240         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
241         int ret;
242         TDB_DATA key;
243         TDB_DATA val;
244         char *new_reason = NULL;
245         char *old_reason = NULL;
246
247         ret = tdb_transaction_start(tdb);
248         if (ret != 0) {
249                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
250                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
251                 return -1;
252         }
253
254         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
255         if (ret != 0) {
256                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
257                                    ctdb_db->db_name, ret));
258                 return -1;
259         }
260         old_reason = ctdb_db->unhealthy_reason;
261
262         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
263         key.dsize = strlen(ctdb_db->db_name);
264
265         if (given_reason) {
266                 new_reason = talloc_strdup(ctdb_db, given_reason);
267                 if (new_reason == NULL) {
268                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
269                                           given_reason));
270                         return -1;
271                 }
272         } else if (old_reason && num_healthy_nodes == 0) {
273                 /*
274                  * If the reason indicates ok, but there where no healthy nodes
275                  * available, that it means, we have not recovered valid content
276                  * of the db. So if there's an old reason, prefix it with
277                  * "NO-HEALTHY-NODES - "
278                  */
279                 const char *prefix;
280
281 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
282                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
283                 if (ret != 0) {
284                         prefix = _TMP_PREFIX;
285                 } else {
286                         prefix = "";
287                 }
288                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
289                                          prefix, old_reason);
290                 if (new_reason == NULL) {
291                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
292                                           prefix, old_reason));
293                         return -1;
294                 }
295 #undef _TMP_PREFIX
296         }
297
298         if (new_reason) {
299                 val.dptr = discard_const_p(uint8_t, new_reason);
300                 val.dsize = strlen(new_reason);
301
302                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
303                 if (ret != 0) {
304                         tdb_transaction_cancel(tdb);
305                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
306                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
307                                            ret, tdb_errorstr(tdb)));
308                         talloc_free(new_reason);
309                         return -1;
310                 }
311                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
312                                    ctdb_db->db_name, new_reason));
313         } else if (old_reason) {
314                 ret = tdb_delete(tdb, key);
315                 if (ret != 0) {
316                         tdb_transaction_cancel(tdb);
317                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
318                                            tdb_name(tdb), ctdb_db->db_name,
319                                            ret, tdb_errorstr(tdb)));
320                         talloc_free(new_reason);
321                         return -1;
322                 }
323                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
324                                    ctdb_db->db_name));
325         }
326
327         ret = tdb_transaction_commit(tdb);
328         if (ret != TDB_SUCCESS) {
329                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
330                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
331                 talloc_free(new_reason);
332                 return -1;
333         }
334
335         talloc_free(old_reason);
336         ctdb_db->unhealthy_reason = new_reason;
337
338         return 0;
339 }
340
341 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
342                                      struct ctdb_db_context *ctdb_db)
343 {
344         time_t now = time(NULL);
345         char *new_path;
346         char *new_reason;
347         int ret;
348         struct tm *tm;
349
350         tm = gmtime(&now);
351
352         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
353         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
354                                    "%04u%02u%02u%02u%02u%02u.0Z",
355                                    ctdb_db->db_path,
356                                    tm->tm_year+1900, tm->tm_mon+1,
357                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
358                                    tm->tm_sec);
359         if (new_path == NULL) {
360                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
361                 return -1;
362         }
363
364         new_reason = talloc_asprintf(ctdb_db,
365                                      "ERROR - Backup of corrupted TDB in '%s'",
366                                      new_path);
367         if (new_reason == NULL) {
368                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
369                 return -1;
370         }
371         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
372         talloc_free(new_reason);
373         if (ret != 0) {
374                 DEBUG(DEBUG_CRIT,(__location__
375                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
376                                  ctdb_db->db_path));
377                 return -1;
378         }
379
380         ret = rename(ctdb_db->db_path, new_path);
381         if (ret != 0) {
382                 DEBUG(DEBUG_CRIT,(__location__
383                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
384                                   ctdb_db->db_path, new_path,
385                                   errno, strerror(errno)));
386                 talloc_free(new_path);
387                 return -1;
388         }
389
390         DEBUG(DEBUG_CRIT,(__location__
391                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
392                          ctdb_db->db_path, new_path));
393         talloc_free(new_path);
394         return 0;
395 }
396
397 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
398 {
399         struct ctdb_db_context *ctdb_db;
400         int ret;
401         int ok = 0;
402         int fail = 0;
403
404         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
405                 if (!ctdb_db->persistent) {
406                         continue;
407                 }
408
409                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
410                 if (ret != 0) {
411                         DEBUG(DEBUG_ALERT,(__location__
412                                            " load persistent health for '%s' failed\n",
413                                            ctdb_db->db_path));
414                         return -1;
415                 }
416
417                 if (ctdb_db->unhealthy_reason == NULL) {
418                         ok++;
419                         DEBUG(DEBUG_INFO,(__location__
420                                    " persistent db '%s' healthy\n",
421                                    ctdb_db->db_path));
422                         continue;
423                 }
424
425                 fail++;
426                 DEBUG(DEBUG_ALERT,(__location__
427                                    " persistent db '%s' unhealthy: %s\n",
428                                    ctdb_db->db_path,
429                                    ctdb_db->unhealthy_reason));
430         }
431         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
432               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
433                ok, fail));
434
435         if (fail != 0) {
436                 return -1;
437         }
438
439         return 0;
440 }
441
442
443 /*
444   mark a database - as healthy
445  */
446 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
447 {
448         uint32_t db_id = *(uint32_t *)indata.dptr;
449         struct ctdb_db_context *ctdb_db;
450         int ret;
451         bool may_recover = false;
452
453         ctdb_db = find_ctdb_db(ctdb, db_id);
454         if (!ctdb_db) {
455                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
456                 return -1;
457         }
458
459         if (ctdb_db->unhealthy_reason) {
460                 may_recover = true;
461         }
462
463         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
464         if (ret != 0) {
465                 DEBUG(DEBUG_ERR,(__location__
466                                  " ctdb_update_persistent_health(%s) failed\n",
467                                  ctdb_db->db_name));
468                 return -1;
469         }
470
471         if (may_recover && !ctdb->done_startup) {
472                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
473                                   ctdb_db->db_name));
474                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
475         }
476
477         return 0;
478 }
479
480 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
481                                    TDB_DATA indata,
482                                    TDB_DATA *outdata)
483 {
484         uint32_t db_id = *(uint32_t *)indata.dptr;
485         struct ctdb_db_context *ctdb_db;
486         int ret;
487
488         ctdb_db = find_ctdb_db(ctdb, db_id);
489         if (!ctdb_db) {
490                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
491                 return -1;
492         }
493
494         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
495         if (ret != 0) {
496                 DEBUG(DEBUG_ERR,(__location__
497                                  " ctdb_load_persistent_health(%s) failed\n",
498                                  ctdb_db->db_name));
499                 return -1;
500         }
501
502         *outdata = tdb_null;
503         if (ctdb_db->unhealthy_reason) {
504                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
505                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
506         }
507
508         return 0;
509 }
510
511 /*
512   attach to a database, handling both persistent and non-persistent databases
513   return 0 on success, -1 on failure
514  */
515 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
516                              bool persistent, const char *unhealthy_reason,
517                              bool jenkinshash)
518 {
519         struct ctdb_db_context *ctdb_db, *tmp_db;
520         int ret;
521         struct TDB_DATA key;
522         unsigned tdb_flags;
523         int mode = 0600;
524         int remaining_tries = 0;
525
526         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
527         CTDB_NO_MEMORY(ctdb, ctdb_db);
528
529         ctdb_db->priority = 1;
530         ctdb_db->ctdb = ctdb;
531         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
532         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
533
534         key.dsize = strlen(db_name)+1;
535         key.dptr  = discard_const(db_name);
536         ctdb_db->db_id = ctdb_hash(&key);
537         ctdb_db->persistent = persistent;
538
539         /* check for hash collisions */
540         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
541                 if (tmp_db->db_id == ctdb_db->db_id) {
542                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
543                                  tmp_db->db_id, db_name, tmp_db->db_name));
544                         talloc_free(ctdb_db);
545                         return -1;
546                 }
547         }
548
549         if (persistent) {
550                 if (unhealthy_reason) {
551                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
552                                                             unhealthy_reason, 0);
553                         if (ret != 0) {
554                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
555                                                    ctdb_db->db_name, unhealthy_reason, ret));
556                                 talloc_free(ctdb_db);
557                                 return -1;
558                         }
559                 }
560
561                 if (ctdb->max_persistent_check_errors > 0) {
562                         remaining_tries = 1;
563                 }
564                 if (ctdb->done_startup) {
565                         remaining_tries = 0;
566                 }
567
568                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
569                 if (ret != 0) {
570                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
571                                    ctdb_db->db_name, ret));
572                         talloc_free(ctdb_db);
573                         return -1;
574                 }
575         }
576
577         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
578                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
579                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
580                 talloc_free(ctdb_db);
581                 return -1;
582         }
583
584         if (ctdb_db->unhealthy_reason) {
585                 /* this is just a warning, but we want that in the log file! */
586                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
587                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
588         }
589
590         /* open the database */
591         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
592                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
593                                            db_name, ctdb->pnn);
594
595         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
596         if (ctdb->valgrinding) {
597                 tdb_flags |= TDB_NOMMAP;
598         }
599         tdb_flags |= TDB_DISALLOW_NESTING;
600         if (jenkinshash) {
601                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
602         }
603
604 again:
605         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
606                                       ctdb->tunable.database_hash_size, 
607                                       tdb_flags, 
608                                       O_CREAT|O_RDWR, mode);
609         if (ctdb_db->ltdb == NULL) {
610                 struct stat st;
611                 int saved_errno = errno;
612
613                 if (!persistent) {
614                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
615                                           ctdb_db->db_path,
616                                           saved_errno,
617                                           strerror(saved_errno)));
618                         talloc_free(ctdb_db);
619                         return -1;
620                 }
621
622                 if (remaining_tries == 0) {
623                         DEBUG(DEBUG_CRIT,(__location__
624                                           "Failed to open persistent tdb '%s': %d - %s\n",
625                                           ctdb_db->db_path,
626                                           saved_errno,
627                                           strerror(saved_errno)));
628                         talloc_free(ctdb_db);
629                         return -1;
630                 }
631
632                 ret = stat(ctdb_db->db_path, &st);
633                 if (ret != 0) {
634                         DEBUG(DEBUG_CRIT,(__location__
635                                           "Failed to open persistent tdb '%s': %d - %s\n",
636                                           ctdb_db->db_path,
637                                           saved_errno,
638                                           strerror(saved_errno)));
639                         talloc_free(ctdb_db);
640                         return -1;
641                 }
642
643                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
644                 if (ret != 0) {
645                         DEBUG(DEBUG_CRIT,(__location__
646                                           "Failed to open persistent tdb '%s': %d - %s\n",
647                                           ctdb_db->db_path,
648                                           saved_errno,
649                                           strerror(saved_errno)));
650                         talloc_free(ctdb_db);
651                         return -1;
652                 }
653
654                 remaining_tries--;
655                 mode = st.st_mode;
656                 goto again;
657         }
658
659         if (!persistent) {
660                 ctdb_check_db_empty(ctdb_db);
661         } else {
662                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
663                 if (ret != 0) {
664                         int fd;
665                         struct stat st;
666
667                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
668                                           ctdb_db->db_path, ret,
669                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
670                         if (remaining_tries == 0) {
671                                 talloc_free(ctdb_db);
672                                 return -1;
673                         }
674
675                         fd = tdb_fd(ctdb_db->ltdb->tdb);
676                         ret = fstat(fd, &st);
677                         if (ret != 0) {
678                                 DEBUG(DEBUG_CRIT,(__location__
679                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
680                                                   ctdb_db->db_path,
681                                                   errno,
682                                                   strerror(errno)));
683                                 talloc_free(ctdb_db);
684                                 return -1;
685                         }
686
687                         /* close the TDB */
688                         talloc_free(ctdb_db->ltdb);
689                         ctdb_db->ltdb = NULL;
690
691                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
692                         if (ret != 0) {
693                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
694                                                   ctdb_db->db_path));
695                                 talloc_free(ctdb_db);
696                                 return -1;
697                         }
698
699                         remaining_tries--;
700                         mode = st.st_mode;
701                         goto again;
702                 }
703         }
704
705         DLIST_ADD(ctdb->db_list, ctdb_db);
706
707         /* setting this can help some high churn databases */
708         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
709
710         /* 
711            all databases support the "null" function. we need this in
712            order to do forced migration of records
713         */
714         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
715         if (ret != 0) {
716                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
717                 talloc_free(ctdb_db);
718                 return -1;
719         }
720
721         /* 
722            all databases support the "fetch" function. we need this
723            for efficient Samba3 ctdb fetch
724         */
725         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
726         if (ret != 0) {
727                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
728                 talloc_free(ctdb_db);
729                 return -1;
730         }
731
732         ret = ctdb_vacuum_init(ctdb_db);
733         if (ret != 0) {
734                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
735                                   "database '%s'\n", ctdb_db->db_name));
736                 talloc_free(ctdb_db);
737                 return -1;
738         }
739
740
741         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
742         
743         /* success */
744         return 0;
745 }
746
747
748 struct ctdb_deferred_attach_context {
749         struct ctdb_deferred_attach_context *next, *prev;
750         struct ctdb_context *ctdb;
751         struct ctdb_req_control *c;
752 };
753
754
755 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
756 {
757         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
758
759         return 0;
760 }
761
762 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
763 {
764         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
765         struct ctdb_context *ctdb = da_ctx->ctdb;
766
767         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
768         talloc_free(da_ctx);
769 }
770
771 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
772 {
773         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
774         struct ctdb_context *ctdb = da_ctx->ctdb;
775
776         /* This talloc-steals the packet ->c */
777         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
778         talloc_free(da_ctx);
779 }
780
781 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
782 {
783         struct ctdb_deferred_attach_context *da_ctx;
784
785         /* call it from the main event loop as soon as the current event 
786            finishes.
787          */
788         while ((da_ctx = ctdb->deferred_attach) != NULL) {
789                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
790                 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
791         }
792
793         return 0;
794 }
795
796 /*
797   a client has asked to attach a new database
798  */
799 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
800                                TDB_DATA *outdata, uint64_t tdb_flags, 
801                                bool persistent, uint32_t client_id,
802                                struct ctdb_req_control *c,
803                                bool *async_reply)
804 {
805         const char *db_name = (const char *)indata.dptr;
806         struct ctdb_db_context *db;
807         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
808         struct ctdb_client *client = NULL;
809
810         /* dont allow any local clients to attach while we are in recovery mode
811          * except for the recovery daemon.
812          * allow all attach from the network since these are always from remote
813          * recovery daemons.
814          */
815         if (client_id != 0) {
816                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
817         }
818         if (client != NULL) {
819                 /* If the node is inactive it is not part of the cluster
820                    and we should not allow clients to attach to any
821                    databases
822                 */
823                 if (node->flags & NODE_FLAGS_INACTIVE) {
824                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
825                         return -1;
826                 }
827
828                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
829                  && client->pid != ctdb->recoverd_pid) {
830                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
831
832                         if (da_ctx == NULL) {
833                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
834                                 return -1;
835                         }
836
837                         da_ctx->ctdb = ctdb;
838                         da_ctx->c = talloc_steal(da_ctx, c);
839                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
840                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
841
842                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
843
844                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
845                         *async_reply = true;
846                         return 0;
847                 }
848         }
849
850         /* the client can optionally pass additional tdb flags, but we
851            only allow a subset of those on the database in ctdb. Note
852            that tdb_flags is passed in via the (otherwise unused)
853            srvid to the attach control */
854         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
855
856         /* see if we already have this name */
857         db = ctdb_db_handle(ctdb, db_name);
858         if (db) {
859                 outdata->dptr  = (uint8_t *)&db->db_id;
860                 outdata->dsize = sizeof(db->db_id);
861                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
862                 return 0;
863         }
864
865         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
866                 return -1;
867         }
868
869         db = ctdb_db_handle(ctdb, db_name);
870         if (!db) {
871                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
872                 return -1;
873         }
874
875         /* remember the flags the client has specified */
876         tdb_add_flags(db->ltdb->tdb, tdb_flags);
877
878         outdata->dptr  = (uint8_t *)&db->db_id;
879         outdata->dsize = sizeof(db->db_id);
880
881         /* Try to ensure it's locked in mem */
882         ctdb_lockdown_memory(ctdb);
883
884         /* tell all the other nodes about this database */
885         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
886                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
887                                                 CTDB_CONTROL_DB_ATTACH,
888                                  0, CTDB_CTRL_FLAG_NOREPLY,
889                                  indata, NULL, NULL);
890
891         /* success */
892         return 0;
893 }
894
895
896 /*
897   attach to all existing persistent databases
898  */
899 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
900                                   const char *unhealthy_reason)
901 {
902         DIR *d;
903         struct dirent *de;
904
905         /* open the persistent db directory and scan it for files */
906         d = opendir(ctdb->db_directory_persistent);
907         if (d == NULL) {
908                 return 0;
909         }
910
911         while ((de=readdir(d))) {
912                 char *p, *s, *q;
913                 size_t len = strlen(de->d_name);
914                 uint32_t node;
915                 int invalid_name = 0;
916                 
917                 s = talloc_strdup(ctdb, de->d_name);
918                 CTDB_NO_MEMORY(ctdb, s);
919
920                 /* only accept names ending in .tdb */
921                 p = strstr(s, ".tdb.");
922                 if (len < 7 || p == NULL) {
923                         talloc_free(s);
924                         continue;
925                 }
926
927                 /* only accept names ending with .tdb. and any number of digits */
928                 q = p+5;
929                 while (*q != 0 && invalid_name == 0) {
930                         if (!isdigit(*q++)) {
931                                 invalid_name = 1;
932                         }
933                 }
934                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
935                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
936                         talloc_free(s);
937                         continue;
938                 }
939                 p[4] = 0;
940
941                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
942                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
943                         closedir(d);
944                         talloc_free(s);
945                         return -1;
946                 }
947
948                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
949
950                 talloc_free(s);
951         }
952         closedir(d);
953         return 0;
954 }
955
956 int ctdb_attach_databases(struct ctdb_context *ctdb)
957 {
958         int ret;
959         char *persistent_health_path = NULL;
960         char *unhealthy_reason = NULL;
961         bool first_try = true;
962
963         if (ctdb->db_directory == NULL) {
964                 ctdb->db_directory = VARDIR "/ctdb";
965         }
966         if (ctdb->db_directory_persistent == NULL) {
967                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
968         }
969         if (ctdb->db_directory_state == NULL) {
970                 ctdb->db_directory_state = VARDIR "/ctdb/state";
971         }
972
973         /* make sure the db directory exists */
974         ret = mkdir(ctdb->db_directory, 0700);
975         if (ret == -1 && errno != EEXIST) {
976                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
977                          ctdb->db_directory));
978                 return -1;
979         }
980
981         /* make sure the persistent db directory exists */
982         ret = mkdir(ctdb->db_directory_persistent, 0700);
983         if (ret == -1 && errno != EEXIST) {
984                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
985                          ctdb->db_directory_persistent));
986                 return -1;
987         }
988
989         /* make sure the internal state db directory exists */
990         ret = mkdir(ctdb->db_directory_state, 0700);
991         if (ret == -1 && errno != EEXIST) {
992                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
993                          ctdb->db_directory_state));
994                 return -1;
995         }
996
997         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
998                                                  ctdb->db_directory_state,
999                                                  PERSISTENT_HEALTH_TDB,
1000                                                  ctdb->pnn);
1001         if (persistent_health_path == NULL) {
1002                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1003                 return -1;
1004         }
1005
1006 again:
1007
1008         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1009                                                    0, TDB_DISALLOW_NESTING,
1010                                                    O_CREAT | O_RDWR, 0600);
1011         if (ctdb->db_persistent_health == NULL) {
1012                 struct tdb_wrap *tdb;
1013
1014                 if (!first_try) {
1015                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1016                                           persistent_health_path,
1017                                           errno,
1018                                           strerror(errno)));
1019                         talloc_free(persistent_health_path);
1020                         talloc_free(unhealthy_reason);
1021                         return -1;
1022                 }
1023                 first_try = false;
1024
1025                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1026                                                    persistent_health_path,
1027                                                    "was cleared after a failure",
1028                                                    "manual verification needed");
1029                 if (unhealthy_reason == NULL) {
1030                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1031                         talloc_free(persistent_health_path);
1032                         return -1;
1033                 }
1034
1035                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1036                                   persistent_health_path));
1037                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1038                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1039                                     O_CREAT | O_RDWR, 0600);
1040                 if (tdb) {
1041                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1042                                           persistent_health_path,
1043                                           errno,
1044                                           strerror(errno)));
1045                         talloc_free(persistent_health_path);
1046                         talloc_free(unhealthy_reason);
1047                         return -1;
1048                 }
1049
1050                 talloc_free(tdb);
1051                 goto again;
1052         }
1053         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1054         if (ret != 0) {
1055                 struct tdb_wrap *tdb;
1056
1057                 talloc_free(ctdb->db_persistent_health);
1058                 ctdb->db_persistent_health = NULL;
1059
1060                 if (!first_try) {
1061                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1062                                           persistent_health_path));
1063                         talloc_free(persistent_health_path);
1064                         talloc_free(unhealthy_reason);
1065                         return -1;
1066                 }
1067                 first_try = false;
1068
1069                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1070                                                    persistent_health_path,
1071                                                    "was cleared after a failure",
1072                                                    "manual verification needed");
1073                 if (unhealthy_reason == NULL) {
1074                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1075                         talloc_free(persistent_health_path);
1076                         return -1;
1077                 }
1078
1079                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1080                                   persistent_health_path));
1081                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1082                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1083                                     O_CREAT | O_RDWR, 0600);
1084                 if (tdb) {
1085                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1086                                           persistent_health_path,
1087                                           errno,
1088                                           strerror(errno)));
1089                         talloc_free(persistent_health_path);
1090                         talloc_free(unhealthy_reason);
1091                         return -1;
1092                 }
1093
1094                 talloc_free(tdb);
1095                 goto again;
1096         }
1097         talloc_free(persistent_health_path);
1098
1099         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1100         talloc_free(unhealthy_reason);
1101         if (ret != 0) {
1102                 return ret;
1103         }
1104
1105         return 0;
1106 }
1107
1108 /*
1109   called when a broadcast seqnum update comes in
1110  */
1111 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1112 {
1113         struct ctdb_db_context *ctdb_db;
1114         if (srcnode == ctdb->pnn) {
1115                 /* don't update ourselves! */
1116                 return 0;
1117         }
1118
1119         ctdb_db = find_ctdb_db(ctdb, db_id);
1120         if (!ctdb_db) {
1121                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1122                 return -1;
1123         }
1124
1125         if (ctdb_db->unhealthy_reason) {
1126                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1127                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1128                 return -1;
1129         }
1130
1131         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1132         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1133         return 0;
1134 }
1135
1136 /*
1137   timer to check for seqnum changes in a ltdb and propogate them
1138  */
1139 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1140                                    struct timeval t, void *p)
1141 {
1142         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1143         struct ctdb_context *ctdb = ctdb_db->ctdb;
1144         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1145         if (new_seqnum != ctdb_db->seqnum) {
1146                 /* something has changed - propogate it */
1147                 TDB_DATA data;
1148                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1149                 data.dsize = sizeof(uint32_t);
1150                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1151                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1152                                          data, NULL, NULL);             
1153         }
1154         ctdb_db->seqnum = new_seqnum;
1155
1156         /* setup a new timer */
1157         ctdb_db->seqnum_update =
1158                 event_add_timed(ctdb->ev, ctdb_db, 
1159                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1160                                 ctdb_ltdb_seqnum_check, ctdb_db);
1161 }
1162
1163 /*
1164   enable seqnum handling on this db
1165  */
1166 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1167 {
1168         struct ctdb_db_context *ctdb_db;
1169         ctdb_db = find_ctdb_db(ctdb, db_id);
1170         if (!ctdb_db) {
1171                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1172                 return -1;
1173         }
1174
1175         if (ctdb_db->seqnum_update == NULL) {
1176                 ctdb_db->seqnum_update =
1177                         event_add_timed(ctdb->ev, ctdb_db, 
1178                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1179                                         ctdb_ltdb_seqnum_check, ctdb_db);
1180         }
1181
1182         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1183         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1184         return 0;
1185 }
1186
1187 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1188 {
1189         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1190         struct ctdb_db_context *ctdb_db;
1191
1192         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1193         if (!ctdb_db) {
1194                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1195                 return 0;
1196         }
1197
1198         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1199                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1200                 return 0;
1201         }
1202
1203         ctdb_db->priority = db_prio->priority;
1204         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1205
1206         return 0;
1207 }
1208
1209