In the example script to remove all ip addresses after a ctdb crash,
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "db_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /*
35   this is the dummy null procedure that all databases support
36 */
37 static int ctdb_null_func(struct ctdb_call_info *call)
38 {
39         return 0;
40 }
41
42 /*
43   this is a plain fetch procedure that all databases support
44 */
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
46 {
47         call->reply_data = &call->record_data;
48         return 0;
49 }
50
51
52
53 struct lock_fetch_state {
54         struct ctdb_context *ctdb;
55         void (*recv_pkt)(void *, struct ctdb_req_header *);
56         void *recv_context;
57         struct ctdb_req_header *hdr;
58         uint32_t generation;
59         bool ignore_generation;
60 };
61
62 /*
63   called when we should retry the operation
64  */
65 static void lock_fetch_callback(void *p)
66 {
67         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68         if (!state->ignore_generation &&
69             state->generation != state->ctdb->vnn_map->generation) {
70                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71                 talloc_free(state->hdr);
72                 return;
73         }
74         state->recv_pkt(state->recv_context, state->hdr);
75         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
76 }
77
78
79 /*
80   do a non-blocking ltdb_lock, deferring this ctdb request until we
81   have the chainlock
82
83   It does the following:
84
85    1) tries to get the chainlock. If it succeeds, then it returns 0
86
87    2) if it fails to get a chainlock immediately then it sets up a
88    non-blocking chainlock via ctdb_lockwait, and when it gets the
89    chainlock it re-submits this ctdb request to the main packet
90    receive function
91
92    This effectively queues all ctdb requests that cannot be
93    immediately satisfied until it can get the lock. This means that
94    the main ctdb daemon will not block waiting for a chainlock held by
95    a client
96
97    There are 3 possible return values:
98
99        0:    means that it got the lock immediately.
100       -1:    means that it failed to get the lock, and won't retry
101       -2:    means that it failed to get the lock immediately, but will retry
102  */
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
104                            TDB_DATA key, struct ctdb_req_header *hdr,
105                            void (*recv_pkt)(void *, struct ctdb_req_header *),
106                            void *recv_context, bool ignore_generation)
107 {
108         int ret;
109         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110         struct lockwait_handle *h;
111         struct lock_fetch_state *state;
112         
113         ret = tdb_chainlock_nonblock(tdb, key);
114
115         if (ret != 0 &&
116             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117                 /* a hard failure - don't try again */
118                 return -1;
119         }
120
121         /* when torturing, ensure we test the contended path */
122         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
123             random() % 5 == 0) {
124                 ret = -1;
125                 tdb_chainunlock(tdb, key);
126         }
127
128         /* first the non-contended path */
129         if (ret == 0) {
130                 return 0;
131         }
132
133         state = talloc(hdr, struct lock_fetch_state);
134         state->ctdb = ctdb_db->ctdb;
135         state->hdr = hdr;
136         state->recv_pkt = recv_pkt;
137         state->recv_context = recv_context;
138         state->generation = ctdb_db->ctdb->vnn_map->generation;
139         state->ignore_generation = ignore_generation;
140
141         /* now the contended path */
142         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
143         if (h == NULL) {
144                 tdb_chainunlock(tdb, key);
145                 return -1;
146         }
147
148         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
149            so it won't be freed yet */
150         talloc_steal(state, hdr);
151         talloc_steal(state, h);
152
153         /* now tell the caller than we will retry asynchronously */
154         return -2;
155 }
156
157 /*
158   a varient of ctdb_ltdb_lock_requeue that also fetches the record
159  */
160 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
161                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
162                                  struct ctdb_req_header *hdr, TDB_DATA *data,
163                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
164                                  void *recv_context, bool ignore_generation)
165 {
166         int ret;
167
168         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
169                                      recv_context, ignore_generation);
170         if (ret == 0) {
171                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
172                 if (ret != 0) {
173                         ctdb_ltdb_unlock(ctdb_db, key);
174                 }
175         }
176         return ret;
177 }
178
179
180 /*
181   paraoid check to see if the db is empty
182  */
183 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
184 {
185         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
186         int count = tdb_traverse_read(tdb, NULL, NULL);
187         if (count != 0) {
188                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
189                          ctdb_db->db_path));
190                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
191         }
192 }
193
194 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
195                                 struct ctdb_db_context *ctdb_db)
196 {
197         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
198         char *old;
199         char *reason = NULL;
200         TDB_DATA key;
201         TDB_DATA val;
202
203         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
204         key.dsize = strlen(ctdb_db->db_name);
205
206         old = ctdb_db->unhealthy_reason;
207         ctdb_db->unhealthy_reason = NULL;
208
209         val = tdb_fetch(tdb, key);
210         if (val.dsize > 0) {
211                 reason = talloc_strndup(ctdb_db,
212                                         (const char *)val.dptr,
213                                         val.dsize);
214                 if (reason == NULL) {
215                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
216                                            (int)val.dsize));
217                         ctdb_db->unhealthy_reason = old;
218                         free(val.dptr);
219                         return -1;
220                 }
221         }
222
223         if (val.dptr) {
224                 free(val.dptr);
225         }
226
227         talloc_free(old);
228         ctdb_db->unhealthy_reason = reason;
229         return 0;
230 }
231
232 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
233                                   struct ctdb_db_context *ctdb_db,
234                                   const char *given_reason,/* NULL means healthy */
235                                   int num_healthy_nodes)
236 {
237         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
238         int ret;
239         TDB_DATA key;
240         TDB_DATA val;
241         char *new_reason = NULL;
242         char *old_reason = NULL;
243
244         ret = tdb_transaction_start(tdb);
245         if (ret != 0) {
246                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
247                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
248                 return -1;
249         }
250
251         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
252         if (ret != 0) {
253                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
254                                    ctdb_db->db_name, ret));
255                 return -1;
256         }
257         old_reason = ctdb_db->unhealthy_reason;
258
259         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
260         key.dsize = strlen(ctdb_db->db_name);
261
262         if (given_reason) {
263                 new_reason = talloc_strdup(ctdb_db, given_reason);
264                 if (new_reason == NULL) {
265                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
266                                           given_reason));
267                         return -1;
268                 }
269         } else if (old_reason && num_healthy_nodes == 0) {
270                 /*
271                  * If the reason indicates ok, but there where no healthy nodes
272                  * available, that it means, we have not recovered valid content
273                  * of the db. So if there's an old reason, prefix it with
274                  * "NO-HEALTHY-NODES - "
275                  */
276                 const char *prefix;
277
278 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
279                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
280                 if (ret != 0) {
281                         prefix = _TMP_PREFIX;
282                 } else {
283                         prefix = "";
284                 }
285                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
286                                          prefix, old_reason);
287                 if (new_reason == NULL) {
288                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
289                                           prefix, old_reason));
290                         return -1;
291                 }
292 #undef _TMP_PREFIX
293         }
294
295         if (new_reason) {
296                 val.dptr = discard_const_p(uint8_t, new_reason);
297                 val.dsize = strlen(new_reason);
298
299                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
300                 if (ret != 0) {
301                         tdb_transaction_cancel(tdb);
302                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
303                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
304                                            ret, tdb_errorstr(tdb)));
305                         talloc_free(new_reason);
306                         return -1;
307                 }
308                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
309                                    ctdb_db->db_name, new_reason));
310         } else if (old_reason) {
311                 ret = tdb_delete(tdb, key);
312                 if (ret != 0) {
313                         tdb_transaction_cancel(tdb);
314                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
315                                            tdb_name(tdb), ctdb_db->db_name,
316                                            ret, tdb_errorstr(tdb)));
317                         talloc_free(new_reason);
318                         return -1;
319                 }
320                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
321                                    ctdb_db->db_name));
322         }
323
324         ret = tdb_transaction_commit(tdb);
325         if (ret != TDB_SUCCESS) {
326                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
327                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
328                 talloc_free(new_reason);
329                 return -1;
330         }
331
332         talloc_free(old_reason);
333         ctdb_db->unhealthy_reason = new_reason;
334
335         return 0;
336 }
337
338 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
339                                      struct ctdb_db_context *ctdb_db)
340 {
341         time_t now = time(NULL);
342         char *new_path;
343         char *new_reason;
344         int ret;
345         struct tm *tm;
346
347         tm = gmtime(&now);
348
349         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
350         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
351                                    "%04u%02u%02u%02u%02u%02u.0Z",
352                                    ctdb_db->db_path,
353                                    tm->tm_year+1900, tm->tm_mon+1,
354                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
355                                    tm->tm_sec);
356         if (new_path == NULL) {
357                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
358                 return -1;
359         }
360
361         new_reason = talloc_asprintf(ctdb_db,
362                                      "ERROR - Backup of corrupted TDB in '%s'",
363                                      new_path);
364         if (new_reason == NULL) {
365                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
366                 return -1;
367         }
368         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
369         talloc_free(new_reason);
370         if (ret != 0) {
371                 DEBUG(DEBUG_CRIT,(__location__
372                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
373                                  ctdb_db->db_path));
374                 return -1;
375         }
376
377         ret = rename(ctdb_db->db_path, new_path);
378         if (ret != 0) {
379                 DEBUG(DEBUG_CRIT,(__location__
380                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
381                                   ctdb_db->db_path, new_path,
382                                   errno, strerror(errno)));
383                 talloc_free(new_path);
384                 return -1;
385         }
386
387         DEBUG(DEBUG_CRIT,(__location__
388                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
389                          ctdb_db->db_path, new_path));
390         talloc_free(new_path);
391         return 0;
392 }
393
394 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
395 {
396         struct ctdb_db_context *ctdb_db;
397         int ret;
398         int ok = 0;
399         int fail = 0;
400
401         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
402                 if (!ctdb_db->persistent) {
403                         continue;
404                 }
405
406                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
407                 if (ret != 0) {
408                         DEBUG(DEBUG_ALERT,(__location__
409                                            " load persistent health for '%s' failed\n",
410                                            ctdb_db->db_path));
411                         return -1;
412                 }
413
414                 if (ctdb_db->unhealthy_reason == NULL) {
415                         ok++;
416                         DEBUG(DEBUG_INFO,(__location__
417                                    " persistent db '%s' healthy\n",
418                                    ctdb_db->db_path));
419                         continue;
420                 }
421
422                 fail++;
423                 DEBUG(DEBUG_ALERT,(__location__
424                                    " persistent db '%s' unhealthy: %s\n",
425                                    ctdb_db->db_path,
426                                    ctdb_db->unhealthy_reason));
427         }
428         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
429               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
430                ok, fail));
431
432         if (fail != 0) {
433                 return -1;
434         }
435
436         return 0;
437 }
438
439
440 /*
441   mark a database - as healthy
442  */
443 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
444 {
445         uint32_t db_id = *(uint32_t *)indata.dptr;
446         struct ctdb_db_context *ctdb_db;
447         int ret;
448         bool may_recover = false;
449
450         ctdb_db = find_ctdb_db(ctdb, db_id);
451         if (!ctdb_db) {
452                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
453                 return -1;
454         }
455
456         if (ctdb_db->unhealthy_reason) {
457                 may_recover = true;
458         }
459
460         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
461         if (ret != 0) {
462                 DEBUG(DEBUG_ERR,(__location__
463                                  " ctdb_update_persistent_health(%s) failed\n",
464                                  ctdb_db->db_name));
465                 return -1;
466         }
467
468         if (may_recover && !ctdb->done_startup) {
469                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
470                                   ctdb_db->db_name));
471                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
472         }
473
474         return 0;
475 }
476
477 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
478                                    TDB_DATA indata,
479                                    TDB_DATA *outdata)
480 {
481         uint32_t db_id = *(uint32_t *)indata.dptr;
482         struct ctdb_db_context *ctdb_db;
483         int ret;
484
485         ctdb_db = find_ctdb_db(ctdb, db_id);
486         if (!ctdb_db) {
487                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
488                 return -1;
489         }
490
491         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
492         if (ret != 0) {
493                 DEBUG(DEBUG_ERR,(__location__
494                                  " ctdb_load_persistent_health(%s) failed\n",
495                                  ctdb_db->db_name));
496                 return -1;
497         }
498
499         *outdata = tdb_null;
500         if (ctdb_db->unhealthy_reason) {
501                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
502                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
503         }
504
505         return 0;
506 }
507
508 /*
509   attach to a database, handling both persistent and non-persistent databases
510   return 0 on success, -1 on failure
511  */
512 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
513                              bool persistent, const char *unhealthy_reason)
514 {
515         struct ctdb_db_context *ctdb_db, *tmp_db;
516         int ret;
517         struct TDB_DATA key;
518         unsigned tdb_flags;
519         int mode = 0600;
520         int remaining_tries = 0;
521
522         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
523         CTDB_NO_MEMORY(ctdb, ctdb_db);
524
525         ctdb_db->priority = 1;
526         ctdb_db->ctdb = ctdb;
527         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
528         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
529
530         key.dsize = strlen(db_name)+1;
531         key.dptr  = discard_const(db_name);
532         ctdb_db->db_id = ctdb_hash(&key);
533         ctdb_db->persistent = persistent;
534
535         /* check for hash collisions */
536         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
537                 if (tmp_db->db_id == ctdb_db->db_id) {
538                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
539                                  tmp_db->db_id, db_name, tmp_db->db_name));
540                         talloc_free(ctdb_db);
541                         return -1;
542                 }
543         }
544
545         if (persistent) {
546                 if (unhealthy_reason) {
547                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
548                                                             unhealthy_reason, 0);
549                         if (ret != 0) {
550                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
551                                                    ctdb_db->db_name, unhealthy_reason, ret));
552                                 talloc_free(ctdb_db);
553                                 return -1;
554                         }
555                 }
556
557                 if (ctdb->max_persistent_check_errors > 0) {
558                         remaining_tries = 1;
559                 }
560                 if (ctdb->done_startup) {
561                         remaining_tries = 0;
562                 }
563
564                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
565                 if (ret != 0) {
566                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
567                                    ctdb_db->db_name, ret));
568                         talloc_free(ctdb_db);
569                         return -1;
570                 }
571         }
572
573         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
574                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
575                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
576                 talloc_free(ctdb_db);
577                 return -1;
578         }
579
580         if (ctdb_db->unhealthy_reason) {
581                 /* this is just a warning, but we want that in the log file! */
582                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
583                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
584         }
585
586         /* open the database */
587         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
588                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
589                                            db_name, ctdb->pnn);
590
591         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
592         if (ctdb->valgrinding) {
593                 tdb_flags |= TDB_NOMMAP;
594         }
595         tdb_flags |= TDB_DISALLOW_NESTING;
596
597 again:
598         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
599                                       ctdb->tunable.database_hash_size, 
600                                       tdb_flags, 
601                                       O_CREAT|O_RDWR, mode);
602         if (ctdb_db->ltdb == NULL) {
603                 struct stat st;
604                 int saved_errno = errno;
605
606                 if (!persistent) {
607                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
608                                           ctdb_db->db_path,
609                                           saved_errno,
610                                           strerror(saved_errno)));
611                         talloc_free(ctdb_db);
612                         return -1;
613                 }
614
615                 if (remaining_tries == 0) {
616                         DEBUG(DEBUG_CRIT,(__location__
617                                           "Failed to open persistent tdb '%s': %d - %s\n",
618                                           ctdb_db->db_path,
619                                           saved_errno,
620                                           strerror(saved_errno)));
621                         talloc_free(ctdb_db);
622                         return -1;
623                 }
624
625                 ret = stat(ctdb_db->db_path, &st);
626                 if (ret != 0) {
627                         DEBUG(DEBUG_CRIT,(__location__
628                                           "Failed to open persistent tdb '%s': %d - %s\n",
629                                           ctdb_db->db_path,
630                                           saved_errno,
631                                           strerror(saved_errno)));
632                         talloc_free(ctdb_db);
633                         return -1;
634                 }
635
636                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
637                 if (ret != 0) {
638                         DEBUG(DEBUG_CRIT,(__location__
639                                           "Failed to open persistent tdb '%s': %d - %s\n",
640                                           ctdb_db->db_path,
641                                           saved_errno,
642                                           strerror(saved_errno)));
643                         talloc_free(ctdb_db);
644                         return -1;
645                 }
646
647                 remaining_tries--;
648                 mode = st.st_mode;
649                 goto again;
650         }
651
652         if (!persistent) {
653                 ctdb_check_db_empty(ctdb_db);
654         } else {
655                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
656                 if (ret != 0) {
657                         int fd;
658                         struct stat st;
659
660                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
661                                           ctdb_db->db_path, ret,
662                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
663                         if (remaining_tries == 0) {
664                                 talloc_free(ctdb_db);
665                                 return -1;
666                         }
667
668                         fd = tdb_fd(ctdb_db->ltdb->tdb);
669                         ret = fstat(fd, &st);
670                         if (ret != 0) {
671                                 DEBUG(DEBUG_CRIT,(__location__
672                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
673                                                   ctdb_db->db_path,
674                                                   errno,
675                                                   strerror(errno)));
676                                 talloc_free(ctdb_db);
677                                 return -1;
678                         }
679
680                         /* close the TDB */
681                         talloc_free(ctdb_db->ltdb);
682                         ctdb_db->ltdb = NULL;
683
684                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
685                         if (ret != 0) {
686                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
687                                                   ctdb_db->db_path));
688                                 talloc_free(ctdb_db);
689                                 return -1;
690                         }
691
692                         remaining_tries--;
693                         mode = st.st_mode;
694                         goto again;
695                 }
696         }
697
698         DLIST_ADD(ctdb->db_list, ctdb_db);
699
700         /* setting this can help some high churn databases */
701         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
702
703         /* 
704            all databases support the "null" function. we need this in
705            order to do forced migration of records
706         */
707         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
708         if (ret != 0) {
709                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
710                 talloc_free(ctdb_db);
711                 return -1;
712         }
713
714         /* 
715            all databases support the "fetch" function. we need this
716            for efficient Samba3 ctdb fetch
717         */
718         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
719         if (ret != 0) {
720                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
721                 talloc_free(ctdb_db);
722                 return -1;
723         }
724
725         ret = ctdb_vacuum_init(ctdb_db);
726         if (ret != 0) {
727                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
728                                   "database '%s'\n", ctdb_db->db_name));
729                 talloc_free(ctdb_db);
730                 return -1;
731         }
732
733
734         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
735         
736         /* success */
737         return 0;
738 }
739
740
741 /*
742   a client has asked to attach a new database
743  */
744 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
745                                TDB_DATA *outdata, uint64_t tdb_flags, 
746                                bool persistent)
747 {
748         const char *db_name = (const char *)indata.dptr;
749         struct ctdb_db_context *db;
750         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
751
752         /* the client can optionally pass additional tdb flags, but we
753            only allow a subset of those on the database in ctdb. Note
754            that tdb_flags is passed in via the (otherwise unused)
755            srvid to the attach control */
756         tdb_flags &= TDB_NOSYNC;
757
758         /* If the node is inactive it is not part of the cluster
759            and we should not allow clients to attach to any
760            databases
761         */
762         if (node->flags & NODE_FLAGS_INACTIVE) {
763                 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
764                 return -1;
765         }
766
767
768         /* see if we already have this name */
769         db = ctdb_db_handle(ctdb, db_name);
770         if (db) {
771                 outdata->dptr  = (uint8_t *)&db->db_id;
772                 outdata->dsize = sizeof(db->db_id);
773                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
774                 return 0;
775         }
776
777         if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
778                 return -1;
779         }
780
781         db = ctdb_db_handle(ctdb, db_name);
782         if (!db) {
783                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
784                 return -1;
785         }
786
787         /* remember the flags the client has specified */
788         tdb_add_flags(db->ltdb->tdb, tdb_flags);
789
790         outdata->dptr  = (uint8_t *)&db->db_id;
791         outdata->dsize = sizeof(db->db_id);
792
793         /* Try to ensure it's locked in mem */
794         ctdb_lockdown_memory(ctdb);
795
796         /* tell all the other nodes about this database */
797         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
798                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
799                                                 CTDB_CONTROL_DB_ATTACH,
800                                  0, CTDB_CTRL_FLAG_NOREPLY,
801                                  indata, NULL, NULL);
802
803         /* success */
804         return 0;
805 }
806
807
808 /*
809   attach to all existing persistent databases
810  */
811 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
812                                   const char *unhealthy_reason)
813 {
814         DIR *d;
815         struct dirent *de;
816
817         /* open the persistent db directory and scan it for files */
818         d = opendir(ctdb->db_directory_persistent);
819         if (d == NULL) {
820                 return 0;
821         }
822
823         while ((de=readdir(d))) {
824                 char *p, *s, *q;
825                 size_t len = strlen(de->d_name);
826                 uint32_t node;
827                 int invalid_name = 0;
828                 
829                 s = talloc_strdup(ctdb, de->d_name);
830                 CTDB_NO_MEMORY(ctdb, s);
831
832                 /* only accept names ending in .tdb */
833                 p = strstr(s, ".tdb.");
834                 if (len < 7 || p == NULL) {
835                         talloc_free(s);
836                         continue;
837                 }
838
839                 /* only accept names ending with .tdb. and any number of digits */
840                 q = p+5;
841                 while (*q != 0 && invalid_name == 0) {
842                         if (!isdigit(*q++)) {
843                                 invalid_name = 1;
844                         }
845                 }
846                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
847                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
848                         talloc_free(s);
849                         continue;
850                 }
851                 p[4] = 0;
852
853                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
854                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
855                         closedir(d);
856                         talloc_free(s);
857                         return -1;
858                 }
859
860                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
861
862                 talloc_free(s);
863         }
864         closedir(d);
865         return 0;
866 }
867
868 int ctdb_attach_databases(struct ctdb_context *ctdb)
869 {
870         int ret;
871         char *persistent_health_path = NULL;
872         char *unhealthy_reason = NULL;
873         bool first_try = true;
874
875         if (ctdb->db_directory == NULL) {
876                 ctdb->db_directory = VARDIR "/ctdb";
877         }
878         if (ctdb->db_directory_persistent == NULL) {
879                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
880         }
881         if (ctdb->db_directory_state == NULL) {
882                 ctdb->db_directory_state = VARDIR "/ctdb/state";
883         }
884
885         /* make sure the db directory exists */
886         ret = mkdir(ctdb->db_directory, 0700);
887         if (ret == -1 && errno != EEXIST) {
888                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
889                          ctdb->db_directory));
890                 return -1;
891         }
892
893         /* make sure the persistent db directory exists */
894         ret = mkdir(ctdb->db_directory_persistent, 0700);
895         if (ret == -1 && errno != EEXIST) {
896                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
897                          ctdb->db_directory_persistent));
898                 return -1;
899         }
900
901         /* make sure the internal state db directory exists */
902         ret = mkdir(ctdb->db_directory_state, 0700);
903         if (ret == -1 && errno != EEXIST) {
904                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
905                          ctdb->db_directory_state));
906                 return -1;
907         }
908
909         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
910                                                  ctdb->db_directory_state,
911                                                  PERSISTENT_HEALTH_TDB,
912                                                  ctdb->pnn);
913         if (persistent_health_path == NULL) {
914                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
915                 return -1;
916         }
917
918 again:
919
920         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
921                                                    0, TDB_DISALLOW_NESTING,
922                                                    O_CREAT | O_RDWR, 0600);
923         if (ctdb->db_persistent_health == NULL) {
924                 struct tdb_wrap *tdb;
925
926                 if (!first_try) {
927                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
928                                           persistent_health_path,
929                                           errno,
930                                           strerror(errno)));
931                         talloc_free(persistent_health_path);
932                         talloc_free(unhealthy_reason);
933                         return -1;
934                 }
935                 first_try = false;
936
937                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
938                                                    persistent_health_path,
939                                                    "was cleared after a failure",
940                                                    "manual verification needed");
941                 if (unhealthy_reason == NULL) {
942                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
943                         talloc_free(persistent_health_path);
944                         return -1;
945                 }
946
947                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
948                                   persistent_health_path));
949                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
950                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
951                                     O_CREAT | O_RDWR, 0600);
952                 if (tdb) {
953                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
954                                           persistent_health_path,
955                                           errno,
956                                           strerror(errno)));
957                         talloc_free(persistent_health_path);
958                         talloc_free(unhealthy_reason);
959                         return -1;
960                 }
961
962                 talloc_free(tdb);
963                 goto again;
964         }
965         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
966         if (ret != 0) {
967                 struct tdb_wrap *tdb;
968
969                 talloc_free(ctdb->db_persistent_health);
970                 ctdb->db_persistent_health = NULL;
971
972                 if (!first_try) {
973                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
974                                           persistent_health_path));
975                         talloc_free(persistent_health_path);
976                         talloc_free(unhealthy_reason);
977                         return -1;
978                 }
979                 first_try = false;
980
981                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
982                                                    persistent_health_path,
983                                                    "was cleared after a failure",
984                                                    "manual verification needed");
985                 if (unhealthy_reason == NULL) {
986                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
987                         talloc_free(persistent_health_path);
988                         return -1;
989                 }
990
991                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
992                                   persistent_health_path));
993                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
994                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
995                                     O_CREAT | O_RDWR, 0600);
996                 if (tdb) {
997                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
998                                           persistent_health_path,
999                                           errno,
1000                                           strerror(errno)));
1001                         talloc_free(persistent_health_path);
1002                         talloc_free(unhealthy_reason);
1003                         return -1;
1004                 }
1005
1006                 talloc_free(tdb);
1007                 goto again;
1008         }
1009         talloc_free(persistent_health_path);
1010
1011         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1012         talloc_free(unhealthy_reason);
1013         if (ret != 0) {
1014                 return ret;
1015         }
1016
1017         return 0;
1018 }
1019
1020 /*
1021   called when a broadcast seqnum update comes in
1022  */
1023 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1024 {
1025         struct ctdb_db_context *ctdb_db;
1026         if (srcnode == ctdb->pnn) {
1027                 /* don't update ourselves! */
1028                 return 0;
1029         }
1030
1031         ctdb_db = find_ctdb_db(ctdb, db_id);
1032         if (!ctdb_db) {
1033                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1034                 return -1;
1035         }
1036
1037         if (ctdb_db->unhealthy_reason) {
1038                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1039                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1040                 return -1;
1041         }
1042
1043         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1044         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1045         return 0;
1046 }
1047
1048 /*
1049   timer to check for seqnum changes in a ltdb and propogate them
1050  */
1051 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1052                                    struct timeval t, void *p)
1053 {
1054         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1055         struct ctdb_context *ctdb = ctdb_db->ctdb;
1056         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1057         if (new_seqnum != ctdb_db->seqnum) {
1058                 /* something has changed - propogate it */
1059                 TDB_DATA data;
1060                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1061                 data.dsize = sizeof(uint32_t);
1062                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1063                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1064                                          data, NULL, NULL);             
1065         }
1066         ctdb_db->seqnum = new_seqnum;
1067
1068         /* setup a new timer */
1069         ctdb_db->seqnum_update =
1070                 event_add_timed(ctdb->ev, ctdb_db, 
1071                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1072                                 ctdb_ltdb_seqnum_check, ctdb_db);
1073 }
1074
1075 /*
1076   enable seqnum handling on this db
1077  */
1078 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1079 {
1080         struct ctdb_db_context *ctdb_db;
1081         ctdb_db = find_ctdb_db(ctdb, db_id);
1082         if (!ctdb_db) {
1083                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1084                 return -1;
1085         }
1086
1087         if (ctdb_db->seqnum_update == NULL) {
1088                 ctdb_db->seqnum_update =
1089                         event_add_timed(ctdb->ev, ctdb_db, 
1090                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1091                                         ctdb_ltdb_seqnum_check, ctdb_db);
1092         }
1093
1094         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1095         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1096         return 0;
1097 }
1098
1099 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1100 {
1101         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1102         struct ctdb_db_context *ctdb_db;
1103
1104         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1105         if (!ctdb_db) {
1106                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1107                 return -1;
1108         }
1109
1110         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1111                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1112                 return -1;
1113         }
1114
1115         ctdb_db->priority = db_prio->priority;
1116         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1117
1118         return 0;
1119 }
1120
1121