event: Update events to latest Samba version 0.9.8
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "db_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /*
35   this is the dummy null procedure that all databases support
36 */
37 static int ctdb_null_func(struct ctdb_call_info *call)
38 {
39         return 0;
40 }
41
42 /*
43   this is a plain fetch procedure that all databases support
44 */
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
46 {
47         call->reply_data = &call->record_data;
48         return 0;
49 }
50
51
52
53 struct lock_fetch_state {
54         struct ctdb_context *ctdb;
55         void (*recv_pkt)(void *, struct ctdb_req_header *);
56         void *recv_context;
57         struct ctdb_req_header *hdr;
58         uint32_t generation;
59         bool ignore_generation;
60 };
61
62 /*
63   called when we should retry the operation
64  */
65 static void lock_fetch_callback(void *p)
66 {
67         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68         if (!state->ignore_generation &&
69             state->generation != state->ctdb->vnn_map->generation) {
70                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71                 talloc_free(state->hdr);
72                 return;
73         }
74         state->recv_pkt(state->recv_context, state->hdr);
75         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
76 }
77
78
79 /*
80   do a non-blocking ltdb_lock, deferring this ctdb request until we
81   have the chainlock
82
83   It does the following:
84
85    1) tries to get the chainlock. If it succeeds, then it returns 0
86
87    2) if it fails to get a chainlock immediately then it sets up a
88    non-blocking chainlock via ctdb_lockwait, and when it gets the
89    chainlock it re-submits this ctdb request to the main packet
90    receive function
91
92    This effectively queues all ctdb requests that cannot be
93    immediately satisfied until it can get the lock. This means that
94    the main ctdb daemon will not block waiting for a chainlock held by
95    a client
96
97    There are 3 possible return values:
98
99        0:    means that it got the lock immediately.
100       -1:    means that it failed to get the lock, and won't retry
101       -2:    means that it failed to get the lock immediately, but will retry
102  */
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
104                            TDB_DATA key, struct ctdb_req_header *hdr,
105                            void (*recv_pkt)(void *, struct ctdb_req_header *),
106                            void *recv_context, bool ignore_generation)
107 {
108         int ret;
109         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110         struct lockwait_handle *h;
111         struct lock_fetch_state *state;
112         
113         ret = tdb_chainlock_nonblock(tdb, key);
114
115         if (ret != 0 &&
116             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117                 /* a hard failure - don't try again */
118                 return -1;
119         }
120
121         /* when torturing, ensure we test the contended path */
122         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
123             random() % 5 == 0) {
124                 ret = -1;
125                 tdb_chainunlock(tdb, key);
126         }
127
128         /* first the non-contended path */
129         if (ret == 0) {
130                 return 0;
131         }
132
133         state = talloc(hdr, struct lock_fetch_state);
134         state->ctdb = ctdb_db->ctdb;
135         state->hdr = hdr;
136         state->recv_pkt = recv_pkt;
137         state->recv_context = recv_context;
138         state->generation = ctdb_db->ctdb->vnn_map->generation;
139         state->ignore_generation = ignore_generation;
140
141         /* now the contended path */
142         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
143         if (h == NULL) {
144                 return -1;
145         }
146
147         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
148            so it won't be freed yet */
149         talloc_steal(state, hdr);
150         talloc_steal(state, h);
151
152         /* now tell the caller than we will retry asynchronously */
153         return -2;
154 }
155
156 /*
157   a varient of ctdb_ltdb_lock_requeue that also fetches the record
158  */
159 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
160                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
161                                  struct ctdb_req_header *hdr, TDB_DATA *data,
162                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
163                                  void *recv_context, bool ignore_generation)
164 {
165         int ret;
166
167         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
168                                      recv_context, ignore_generation);
169         if (ret == 0) {
170                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
171                 if (ret != 0) {
172                         int uret;
173                         uret = ctdb_ltdb_unlock(ctdb_db, key);
174                         if (uret != 0) {
175                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
176                         }
177                 }
178         }
179         return ret;
180 }
181
182
183 /*
184   paraoid check to see if the db is empty
185  */
186 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
187 {
188         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
189         int count = tdb_traverse_read(tdb, NULL, NULL);
190         if (count != 0) {
191                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
192                          ctdb_db->db_path));
193                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
194         }
195 }
196
197 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
198                                 struct ctdb_db_context *ctdb_db)
199 {
200         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
201         char *old;
202         char *reason = NULL;
203         TDB_DATA key;
204         TDB_DATA val;
205
206         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
207         key.dsize = strlen(ctdb_db->db_name);
208
209         old = ctdb_db->unhealthy_reason;
210         ctdb_db->unhealthy_reason = NULL;
211
212         val = tdb_fetch(tdb, key);
213         if (val.dsize > 0) {
214                 reason = talloc_strndup(ctdb_db,
215                                         (const char *)val.dptr,
216                                         val.dsize);
217                 if (reason == NULL) {
218                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
219                                            (int)val.dsize));
220                         ctdb_db->unhealthy_reason = old;
221                         free(val.dptr);
222                         return -1;
223                 }
224         }
225
226         if (val.dptr) {
227                 free(val.dptr);
228         }
229
230         talloc_free(old);
231         ctdb_db->unhealthy_reason = reason;
232         return 0;
233 }
234
235 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
236                                   struct ctdb_db_context *ctdb_db,
237                                   const char *given_reason,/* NULL means healthy */
238                                   int num_healthy_nodes)
239 {
240         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
241         int ret;
242         TDB_DATA key;
243         TDB_DATA val;
244         char *new_reason = NULL;
245         char *old_reason = NULL;
246
247         ret = tdb_transaction_start(tdb);
248         if (ret != 0) {
249                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
250                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
251                 return -1;
252         }
253
254         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
255         if (ret != 0) {
256                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
257                                    ctdb_db->db_name, ret));
258                 return -1;
259         }
260         old_reason = ctdb_db->unhealthy_reason;
261
262         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
263         key.dsize = strlen(ctdb_db->db_name);
264
265         if (given_reason) {
266                 new_reason = talloc_strdup(ctdb_db, given_reason);
267                 if (new_reason == NULL) {
268                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
269                                           given_reason));
270                         return -1;
271                 }
272         } else if (old_reason && num_healthy_nodes == 0) {
273                 /*
274                  * If the reason indicates ok, but there where no healthy nodes
275                  * available, that it means, we have not recovered valid content
276                  * of the db. So if there's an old reason, prefix it with
277                  * "NO-HEALTHY-NODES - "
278                  */
279                 const char *prefix;
280
281 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
282                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
283                 if (ret != 0) {
284                         prefix = _TMP_PREFIX;
285                 } else {
286                         prefix = "";
287                 }
288                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
289                                          prefix, old_reason);
290                 if (new_reason == NULL) {
291                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
292                                           prefix, old_reason));
293                         return -1;
294                 }
295 #undef _TMP_PREFIX
296         }
297
298         if (new_reason) {
299                 val.dptr = discard_const_p(uint8_t, new_reason);
300                 val.dsize = strlen(new_reason);
301
302                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
303                 if (ret != 0) {
304                         tdb_transaction_cancel(tdb);
305                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
306                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
307                                            ret, tdb_errorstr(tdb)));
308                         talloc_free(new_reason);
309                         return -1;
310                 }
311                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
312                                    ctdb_db->db_name, new_reason));
313         } else if (old_reason) {
314                 ret = tdb_delete(tdb, key);
315                 if (ret != 0) {
316                         tdb_transaction_cancel(tdb);
317                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
318                                            tdb_name(tdb), ctdb_db->db_name,
319                                            ret, tdb_errorstr(tdb)));
320                         talloc_free(new_reason);
321                         return -1;
322                 }
323                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
324                                    ctdb_db->db_name));
325         }
326
327         ret = tdb_transaction_commit(tdb);
328         if (ret != TDB_SUCCESS) {
329                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
330                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
331                 talloc_free(new_reason);
332                 return -1;
333         }
334
335         talloc_free(old_reason);
336         ctdb_db->unhealthy_reason = new_reason;
337
338         return 0;
339 }
340
341 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
342                                      struct ctdb_db_context *ctdb_db)
343 {
344         time_t now = time(NULL);
345         char *new_path;
346         char *new_reason;
347         int ret;
348         struct tm *tm;
349
350         tm = gmtime(&now);
351
352         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
353         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
354                                    "%04u%02u%02u%02u%02u%02u.0Z",
355                                    ctdb_db->db_path,
356                                    tm->tm_year+1900, tm->tm_mon+1,
357                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
358                                    tm->tm_sec);
359         if (new_path == NULL) {
360                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
361                 return -1;
362         }
363
364         new_reason = talloc_asprintf(ctdb_db,
365                                      "ERROR - Backup of corrupted TDB in '%s'",
366                                      new_path);
367         if (new_reason == NULL) {
368                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
369                 return -1;
370         }
371         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
372         talloc_free(new_reason);
373         if (ret != 0) {
374                 DEBUG(DEBUG_CRIT,(__location__
375                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
376                                  ctdb_db->db_path));
377                 return -1;
378         }
379
380         ret = rename(ctdb_db->db_path, new_path);
381         if (ret != 0) {
382                 DEBUG(DEBUG_CRIT,(__location__
383                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
384                                   ctdb_db->db_path, new_path,
385                                   errno, strerror(errno)));
386                 talloc_free(new_path);
387                 return -1;
388         }
389
390         DEBUG(DEBUG_CRIT,(__location__
391                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
392                          ctdb_db->db_path, new_path));
393         talloc_free(new_path);
394         return 0;
395 }
396
397 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
398 {
399         struct ctdb_db_context *ctdb_db;
400         int ret;
401         int ok = 0;
402         int fail = 0;
403
404         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
405                 if (!ctdb_db->persistent) {
406                         continue;
407                 }
408
409                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
410                 if (ret != 0) {
411                         DEBUG(DEBUG_ALERT,(__location__
412                                            " load persistent health for '%s' failed\n",
413                                            ctdb_db->db_path));
414                         return -1;
415                 }
416
417                 if (ctdb_db->unhealthy_reason == NULL) {
418                         ok++;
419                         DEBUG(DEBUG_INFO,(__location__
420                                    " persistent db '%s' healthy\n",
421                                    ctdb_db->db_path));
422                         continue;
423                 }
424
425                 fail++;
426                 DEBUG(DEBUG_ALERT,(__location__
427                                    " persistent db '%s' unhealthy: %s\n",
428                                    ctdb_db->db_path,
429                                    ctdb_db->unhealthy_reason));
430         }
431         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
432               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
433                ok, fail));
434
435         if (fail != 0) {
436                 return -1;
437         }
438
439         return 0;
440 }
441
442
443 /*
444   mark a database - as healthy
445  */
446 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
447 {
448         uint32_t db_id = *(uint32_t *)indata.dptr;
449         struct ctdb_db_context *ctdb_db;
450         int ret;
451         bool may_recover = false;
452
453         ctdb_db = find_ctdb_db(ctdb, db_id);
454         if (!ctdb_db) {
455                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
456                 return -1;
457         }
458
459         if (ctdb_db->unhealthy_reason) {
460                 may_recover = true;
461         }
462
463         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
464         if (ret != 0) {
465                 DEBUG(DEBUG_ERR,(__location__
466                                  " ctdb_update_persistent_health(%s) failed\n",
467                                  ctdb_db->db_name));
468                 return -1;
469         }
470
471         if (may_recover && !ctdb->done_startup) {
472                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
473                                   ctdb_db->db_name));
474                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
475         }
476
477         return 0;
478 }
479
480 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
481                                    TDB_DATA indata,
482                                    TDB_DATA *outdata)
483 {
484         uint32_t db_id = *(uint32_t *)indata.dptr;
485         struct ctdb_db_context *ctdb_db;
486         int ret;
487
488         ctdb_db = find_ctdb_db(ctdb, db_id);
489         if (!ctdb_db) {
490                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
491                 return -1;
492         }
493
494         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
495         if (ret != 0) {
496                 DEBUG(DEBUG_ERR,(__location__
497                                  " ctdb_load_persistent_health(%s) failed\n",
498                                  ctdb_db->db_name));
499                 return -1;
500         }
501
502         *outdata = tdb_null;
503         if (ctdb_db->unhealthy_reason) {
504                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
505                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
506         }
507
508         return 0;
509 }
510
511 /*
512   attach to a database, handling both persistent and non-persistent databases
513   return 0 on success, -1 on failure
514  */
515 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
516                              bool persistent, const char *unhealthy_reason)
517 {
518         struct ctdb_db_context *ctdb_db, *tmp_db;
519         int ret;
520         struct TDB_DATA key;
521         unsigned tdb_flags;
522         int mode = 0600;
523         int remaining_tries = 0;
524
525         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
526         CTDB_NO_MEMORY(ctdb, ctdb_db);
527
528         ctdb_db->priority = 1;
529         ctdb_db->ctdb = ctdb;
530         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
531         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
532
533         key.dsize = strlen(db_name)+1;
534         key.dptr  = discard_const(db_name);
535         ctdb_db->db_id = ctdb_hash(&key);
536         ctdb_db->persistent = persistent;
537
538         /* check for hash collisions */
539         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
540                 if (tmp_db->db_id == ctdb_db->db_id) {
541                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
542                                  tmp_db->db_id, db_name, tmp_db->db_name));
543                         talloc_free(ctdb_db);
544                         return -1;
545                 }
546         }
547
548         if (persistent) {
549                 if (unhealthy_reason) {
550                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
551                                                             unhealthy_reason, 0);
552                         if (ret != 0) {
553                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
554                                                    ctdb_db->db_name, unhealthy_reason, ret));
555                                 talloc_free(ctdb_db);
556                                 return -1;
557                         }
558                 }
559
560                 if (ctdb->max_persistent_check_errors > 0) {
561                         remaining_tries = 1;
562                 }
563                 if (ctdb->done_startup) {
564                         remaining_tries = 0;
565                 }
566
567                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
568                 if (ret != 0) {
569                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
570                                    ctdb_db->db_name, ret));
571                         talloc_free(ctdb_db);
572                         return -1;
573                 }
574         }
575
576         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
577                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
578                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
579                 talloc_free(ctdb_db);
580                 return -1;
581         }
582
583         if (ctdb_db->unhealthy_reason) {
584                 /* this is just a warning, but we want that in the log file! */
585                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
586                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
587         }
588
589         /* open the database */
590         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
591                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
592                                            db_name, ctdb->pnn);
593
594         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
595         if (ctdb->valgrinding) {
596                 tdb_flags |= TDB_NOMMAP;
597         }
598         tdb_flags |= TDB_DISALLOW_NESTING;
599
600 again:
601         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
602                                       ctdb->tunable.database_hash_size, 
603                                       tdb_flags, 
604                                       O_CREAT|O_RDWR, mode);
605         if (ctdb_db->ltdb == NULL) {
606                 struct stat st;
607                 int saved_errno = errno;
608
609                 if (!persistent) {
610                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
611                                           ctdb_db->db_path,
612                                           saved_errno,
613                                           strerror(saved_errno)));
614                         talloc_free(ctdb_db);
615                         return -1;
616                 }
617
618                 if (remaining_tries == 0) {
619                         DEBUG(DEBUG_CRIT,(__location__
620                                           "Failed to open persistent tdb '%s': %d - %s\n",
621                                           ctdb_db->db_path,
622                                           saved_errno,
623                                           strerror(saved_errno)));
624                         talloc_free(ctdb_db);
625                         return -1;
626                 }
627
628                 ret = stat(ctdb_db->db_path, &st);
629                 if (ret != 0) {
630                         DEBUG(DEBUG_CRIT,(__location__
631                                           "Failed to open persistent tdb '%s': %d - %s\n",
632                                           ctdb_db->db_path,
633                                           saved_errno,
634                                           strerror(saved_errno)));
635                         talloc_free(ctdb_db);
636                         return -1;
637                 }
638
639                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
640                 if (ret != 0) {
641                         DEBUG(DEBUG_CRIT,(__location__
642                                           "Failed to open persistent tdb '%s': %d - %s\n",
643                                           ctdb_db->db_path,
644                                           saved_errno,
645                                           strerror(saved_errno)));
646                         talloc_free(ctdb_db);
647                         return -1;
648                 }
649
650                 remaining_tries--;
651                 mode = st.st_mode;
652                 goto again;
653         }
654
655         if (!persistent) {
656                 ctdb_check_db_empty(ctdb_db);
657         } else {
658                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
659                 if (ret != 0) {
660                         int fd;
661                         struct stat st;
662
663                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
664                                           ctdb_db->db_path, ret,
665                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
666                         if (remaining_tries == 0) {
667                                 talloc_free(ctdb_db);
668                                 return -1;
669                         }
670
671                         fd = tdb_fd(ctdb_db->ltdb->tdb);
672                         ret = fstat(fd, &st);
673                         if (ret != 0) {
674                                 DEBUG(DEBUG_CRIT,(__location__
675                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
676                                                   ctdb_db->db_path,
677                                                   errno,
678                                                   strerror(errno)));
679                                 talloc_free(ctdb_db);
680                                 return -1;
681                         }
682
683                         /* close the TDB */
684                         talloc_free(ctdb_db->ltdb);
685                         ctdb_db->ltdb = NULL;
686
687                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
688                         if (ret != 0) {
689                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
690                                                   ctdb_db->db_path));
691                                 talloc_free(ctdb_db);
692                                 return -1;
693                         }
694
695                         remaining_tries--;
696                         mode = st.st_mode;
697                         goto again;
698                 }
699         }
700
701         DLIST_ADD(ctdb->db_list, ctdb_db);
702
703         /* setting this can help some high churn databases */
704         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
705
706         /* 
707            all databases support the "null" function. we need this in
708            order to do forced migration of records
709         */
710         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
711         if (ret != 0) {
712                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
713                 talloc_free(ctdb_db);
714                 return -1;
715         }
716
717         /* 
718            all databases support the "fetch" function. we need this
719            for efficient Samba3 ctdb fetch
720         */
721         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
722         if (ret != 0) {
723                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
724                 talloc_free(ctdb_db);
725                 return -1;
726         }
727
728         ret = ctdb_vacuum_init(ctdb_db);
729         if (ret != 0) {
730                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
731                                   "database '%s'\n", ctdb_db->db_name));
732                 talloc_free(ctdb_db);
733                 return -1;
734         }
735
736
737         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
738         
739         /* success */
740         return 0;
741 }
742
743
744 /*
745   a client has asked to attach a new database
746  */
747 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
748                                TDB_DATA *outdata, uint64_t tdb_flags, 
749                                bool persistent)
750 {
751         const char *db_name = (const char *)indata.dptr;
752         struct ctdb_db_context *db;
753         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
754
755         /* the client can optionally pass additional tdb flags, but we
756            only allow a subset of those on the database in ctdb. Note
757            that tdb_flags is passed in via the (otherwise unused)
758            srvid to the attach control */
759         tdb_flags &= TDB_NOSYNC;
760
761         /* If the node is inactive it is not part of the cluster
762            and we should not allow clients to attach to any
763            databases
764         */
765         if (node->flags & NODE_FLAGS_INACTIVE) {
766                 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
767                 return -1;
768         }
769
770
771         /* see if we already have this name */
772         db = ctdb_db_handle(ctdb, db_name);
773         if (db) {
774                 outdata->dptr  = (uint8_t *)&db->db_id;
775                 outdata->dsize = sizeof(db->db_id);
776                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
777                 return 0;
778         }
779
780         if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
781                 return -1;
782         }
783
784         db = ctdb_db_handle(ctdb, db_name);
785         if (!db) {
786                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
787                 return -1;
788         }
789
790         /* remember the flags the client has specified */
791         tdb_add_flags(db->ltdb->tdb, tdb_flags);
792
793         outdata->dptr  = (uint8_t *)&db->db_id;
794         outdata->dsize = sizeof(db->db_id);
795
796         /* Try to ensure it's locked in mem */
797         ctdb_lockdown_memory(ctdb);
798
799         /* tell all the other nodes about this database */
800         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
801                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
802                                                 CTDB_CONTROL_DB_ATTACH,
803                                  0, CTDB_CTRL_FLAG_NOREPLY,
804                                  indata, NULL, NULL);
805
806         /* success */
807         return 0;
808 }
809
810
811 /*
812   attach to all existing persistent databases
813  */
814 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
815                                   const char *unhealthy_reason)
816 {
817         DIR *d;
818         struct dirent *de;
819
820         /* open the persistent db directory and scan it for files */
821         d = opendir(ctdb->db_directory_persistent);
822         if (d == NULL) {
823                 return 0;
824         }
825
826         while ((de=readdir(d))) {
827                 char *p, *s, *q;
828                 size_t len = strlen(de->d_name);
829                 uint32_t node;
830                 int invalid_name = 0;
831                 
832                 s = talloc_strdup(ctdb, de->d_name);
833                 CTDB_NO_MEMORY(ctdb, s);
834
835                 /* only accept names ending in .tdb */
836                 p = strstr(s, ".tdb.");
837                 if (len < 7 || p == NULL) {
838                         talloc_free(s);
839                         continue;
840                 }
841
842                 /* only accept names ending with .tdb. and any number of digits */
843                 q = p+5;
844                 while (*q != 0 && invalid_name == 0) {
845                         if (!isdigit(*q++)) {
846                                 invalid_name = 1;
847                         }
848                 }
849                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
850                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
851                         talloc_free(s);
852                         continue;
853                 }
854                 p[4] = 0;
855
856                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
857                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
858                         closedir(d);
859                         talloc_free(s);
860                         return -1;
861                 }
862
863                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
864
865                 talloc_free(s);
866         }
867         closedir(d);
868         return 0;
869 }
870
871 int ctdb_attach_databases(struct ctdb_context *ctdb)
872 {
873         int ret;
874         char *persistent_health_path = NULL;
875         char *unhealthy_reason = NULL;
876         bool first_try = true;
877
878         if (ctdb->db_directory == NULL) {
879                 ctdb->db_directory = VARDIR "/ctdb";
880         }
881         if (ctdb->db_directory_persistent == NULL) {
882                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
883         }
884         if (ctdb->db_directory_state == NULL) {
885                 ctdb->db_directory_state = VARDIR "/ctdb/state";
886         }
887
888         /* make sure the db directory exists */
889         ret = mkdir(ctdb->db_directory, 0700);
890         if (ret == -1 && errno != EEXIST) {
891                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
892                          ctdb->db_directory));
893                 return -1;
894         }
895
896         /* make sure the persistent db directory exists */
897         ret = mkdir(ctdb->db_directory_persistent, 0700);
898         if (ret == -1 && errno != EEXIST) {
899                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
900                          ctdb->db_directory_persistent));
901                 return -1;
902         }
903
904         /* make sure the internal state db directory exists */
905         ret = mkdir(ctdb->db_directory_state, 0700);
906         if (ret == -1 && errno != EEXIST) {
907                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
908                          ctdb->db_directory_state));
909                 return -1;
910         }
911
912         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
913                                                  ctdb->db_directory_state,
914                                                  PERSISTENT_HEALTH_TDB,
915                                                  ctdb->pnn);
916         if (persistent_health_path == NULL) {
917                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
918                 return -1;
919         }
920
921 again:
922
923         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
924                                                    0, TDB_DISALLOW_NESTING,
925                                                    O_CREAT | O_RDWR, 0600);
926         if (ctdb->db_persistent_health == NULL) {
927                 struct tdb_wrap *tdb;
928
929                 if (!first_try) {
930                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
931                                           persistent_health_path,
932                                           errno,
933                                           strerror(errno)));
934                         talloc_free(persistent_health_path);
935                         talloc_free(unhealthy_reason);
936                         return -1;
937                 }
938                 first_try = false;
939
940                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
941                                                    persistent_health_path,
942                                                    "was cleared after a failure",
943                                                    "manual verification needed");
944                 if (unhealthy_reason == NULL) {
945                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
946                         talloc_free(persistent_health_path);
947                         return -1;
948                 }
949
950                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
951                                   persistent_health_path));
952                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
953                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
954                                     O_CREAT | O_RDWR, 0600);
955                 if (tdb) {
956                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
957                                           persistent_health_path,
958                                           errno,
959                                           strerror(errno)));
960                         talloc_free(persistent_health_path);
961                         talloc_free(unhealthy_reason);
962                         return -1;
963                 }
964
965                 talloc_free(tdb);
966                 goto again;
967         }
968         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
969         if (ret != 0) {
970                 struct tdb_wrap *tdb;
971
972                 talloc_free(ctdb->db_persistent_health);
973                 ctdb->db_persistent_health = NULL;
974
975                 if (!first_try) {
976                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
977                                           persistent_health_path));
978                         talloc_free(persistent_health_path);
979                         talloc_free(unhealthy_reason);
980                         return -1;
981                 }
982                 first_try = false;
983
984                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
985                                                    persistent_health_path,
986                                                    "was cleared after a failure",
987                                                    "manual verification needed");
988                 if (unhealthy_reason == NULL) {
989                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
990                         talloc_free(persistent_health_path);
991                         return -1;
992                 }
993
994                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
995                                   persistent_health_path));
996                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
997                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
998                                     O_CREAT | O_RDWR, 0600);
999                 if (tdb) {
1000                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1001                                           persistent_health_path,
1002                                           errno,
1003                                           strerror(errno)));
1004                         talloc_free(persistent_health_path);
1005                         talloc_free(unhealthy_reason);
1006                         return -1;
1007                 }
1008
1009                 talloc_free(tdb);
1010                 goto again;
1011         }
1012         talloc_free(persistent_health_path);
1013
1014         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1015         talloc_free(unhealthy_reason);
1016         if (ret != 0) {
1017                 return ret;
1018         }
1019
1020         return 0;
1021 }
1022
1023 /*
1024   called when a broadcast seqnum update comes in
1025  */
1026 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1027 {
1028         struct ctdb_db_context *ctdb_db;
1029         if (srcnode == ctdb->pnn) {
1030                 /* don't update ourselves! */
1031                 return 0;
1032         }
1033
1034         ctdb_db = find_ctdb_db(ctdb, db_id);
1035         if (!ctdb_db) {
1036                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1037                 return -1;
1038         }
1039
1040         if (ctdb_db->unhealthy_reason) {
1041                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1042                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1043                 return -1;
1044         }
1045
1046         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1047         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1048         return 0;
1049 }
1050
1051 /*
1052   timer to check for seqnum changes in a ltdb and propogate them
1053  */
1054 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1055                                    struct timeval t, void *p)
1056 {
1057         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1058         struct ctdb_context *ctdb = ctdb_db->ctdb;
1059         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1060         if (new_seqnum != ctdb_db->seqnum) {
1061                 /* something has changed - propogate it */
1062                 TDB_DATA data;
1063                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1064                 data.dsize = sizeof(uint32_t);
1065                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1066                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1067                                          data, NULL, NULL);             
1068         }
1069         ctdb_db->seqnum = new_seqnum;
1070
1071         /* setup a new timer */
1072         ctdb_db->seqnum_update =
1073                 event_add_timed(ctdb->ev, ctdb_db, 
1074                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1075                                 ctdb_ltdb_seqnum_check, ctdb_db);
1076 }
1077
1078 /*
1079   enable seqnum handling on this db
1080  */
1081 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1082 {
1083         struct ctdb_db_context *ctdb_db;
1084         ctdb_db = find_ctdb_db(ctdb, db_id);
1085         if (!ctdb_db) {
1086                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1087                 return -1;
1088         }
1089
1090         if (ctdb_db->seqnum_update == NULL) {
1091                 ctdb_db->seqnum_update =
1092                         event_add_timed(ctdb->ev, ctdb_db, 
1093                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1094                                         ctdb_ltdb_seqnum_check, ctdb_db);
1095         }
1096
1097         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1098         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1099         return 0;
1100 }
1101
1102 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1103 {
1104         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1105         struct ctdb_db_context *ctdb_db;
1106
1107         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1108         if (!ctdb_db) {
1109                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1110                 return -1;
1111         }
1112
1113         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1114                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1115                 return -1;
1116         }
1117
1118         ctdb_db->priority = db_prio->priority;
1119         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1120
1121         return 0;
1122 }
1123
1124