Fix a severe recovery bug that can lead to data corruption for SMB clients.
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tdb/include/tdb.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
28 #include "db_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /**
35  * write a record to a normal database
36  *
37  * This is the server-variant of the ctdb_ltdb_store function.
38  * It contains logic to determine whether a record should be
39  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
40  * controls to the local ctdb daemon if apporpriate.
41  */
42 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
43                                   TDB_DATA key,
44                                   struct ctdb_ltdb_header *header,
45                                   TDB_DATA data)
46 {
47         struct ctdb_context *ctdb = ctdb_db->ctdb;
48         TDB_DATA rec;
49         int ret;
50         bool seqnum_suppressed = false;
51         bool keep = false;
52         bool schedule_for_deletion = false;
53         bool remove_from_delete_queue = false;
54         uint32_t lmaster;
55
56         if (ctdb->flags & CTDB_FLAG_TORTURE) {
57                 struct ctdb_ltdb_header *h2;
58                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
59                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
60                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
61                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
62                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
63                 }
64                 if (rec.dptr) free(rec.dptr);
65         }
66
67         if (ctdb->vnn_map == NULL) {
68                 /*
69                  * Called from a client: always store the record
70                  * Also don't call ctdb_lmaster since it uses the vnn_map!
71                  */
72                 keep = true;
73                 goto store;
74         }
75
76         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
77
78         /*
79          * If we migrate an empty record off to another node
80          * and the record has not been migrated with data,
81          * delete the record instead of storing the empty record.
82          */
83         if (data.dsize != 0) {
84                 keep = true;
85         } else if (header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)) {
86                 keep = true;
87         } else if (ctdb_db->persistent) {
88                 keep = true;
89         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
90                 /*
91                  * The record is not created by the client but
92                  * automatically by the ctdb_ltdb_fetch logic that
93                  * creates a record with an initial header in the
94                  * ltdb before trying to migrate the record from
95                  * the current lmaster. Keep it instead of trying
96                  * to delete the non-existing record...
97                  */
98                 keep = true;
99                 schedule_for_deletion = true;
100         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
101                 keep = true;
102         } else if (ctdb_db->ctdb->pnn == lmaster) {
103                 /*
104                  * If we are lmaster, then we usually keep the record.
105                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
106                  * and the record is empty and has never been migrated
107                  * with data, then we should delete it instead of storing it.
108                  * This is part of the vacuuming process.
109                  *
110                  * The reason that we usually need to store even empty records
111                  * on the lmaster is that a client operating directly on the
112                  * lmaster (== dmaster) expects the local copy of the record to
113                  * exist after successful ctdb migrate call. If the record does
114                  * not exist, the client goes into a migrate loop and eventually
115                  * fails. So storing the empty record makes sure that we do not
116                  * need to change the client code.
117                  */
118                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
119                         keep = true;
120                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
121                         keep = true;
122                 }
123         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
124                 keep = true;
125         }
126
127         if (keep) {
128                 if (!ctdb_db->persistent &&
129                     (ctdb_db->ctdb->pnn == header->dmaster) &&
130                     !(header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)))
131                 {
132                         header->rsn++;
133
134                         if (data.dsize == 0) {
135                                 schedule_for_deletion = true;
136                         }
137                 }
138                 remove_from_delete_queue = !schedule_for_deletion;
139         }
140
141 store:
142         /*
143          * The VACUUM_MIGRATED flag is only set temporarily for
144          * the above logic when the record was retrieved by a
145          * VACUUM_MIGRATE call and should not be stored in the
146          * database.
147          *
148          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
149          * and there are two cases in which the corresponding record
150          * is stored in the local database:
151          * 1. The record has been migrated with data in the past
152          *    (the MIGRATED_WITH_DATA record flag is set).
153          * 2. The record has been filled with data again since it
154          *    had been submitted in the VACUUM_FETCH message to the
155          *    lmaster.
156          * For such records it is important to not store the
157          * VACUUM_MIGRATED flag in the database.
158          */
159         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
160
161         /*
162          * Similarly, clear the AUTOMATIC flag which should not enter
163          * the local database copy since this would require client
164          * modifications to clear the flag when the client stores
165          * the record.
166          */
167         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
168
169         rec.dsize = sizeof(*header) + data.dsize;
170         rec.dptr = talloc_size(ctdb, rec.dsize);
171         CTDB_NO_MEMORY(ctdb, rec.dptr);
172
173         memcpy(rec.dptr, header, sizeof(*header));
174         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
175
176         /* Databases with seqnum updates enabled only get their seqnum
177            changes when/if we modify the data */
178         if (ctdb_db->seqnum_update != NULL) {
179                 TDB_DATA old;
180                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
181
182                 if ( (old.dsize == rec.dsize)
183                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
184                           rec.dptr+sizeof(struct ctdb_ltdb_header),
185                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
186                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
187                         seqnum_suppressed = true;
188                 }
189                 if (old.dptr) free(old.dptr);
190         }
191
192         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
193                             ctdb_db->db_name,
194                             keep?"storing":"deleting",
195                             ctdb_hash(&key)));
196
197         if (keep) {
198                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
199         } else {
200                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
201         }
202
203         if (ret != 0) {
204                 int lvl = DEBUG_ERR;
205
206                 if (keep == false &&
207                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
208                 {
209                         lvl = DEBUG_DEBUG;
210                 }
211
212                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
213                             "%d - %s\n",
214                             ctdb_db->db_name,
215                             keep?"store":"delete", ret,
216                             tdb_errorstr(ctdb_db->ltdb->tdb)));
217
218                 schedule_for_deletion = false;
219                 remove_from_delete_queue = false;
220         }
221         if (seqnum_suppressed) {
222                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
223         }
224
225         talloc_free(rec.dptr);
226
227         if (schedule_for_deletion) {
228                 int ret2;
229                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
230                 if (ret2 != 0) {
231                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
232                 }
233         }
234
235         if (remove_from_delete_queue) {
236                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
237         }
238
239         return ret;
240 }
241
242 struct lock_fetch_state {
243         struct ctdb_context *ctdb;
244         void (*recv_pkt)(void *, struct ctdb_req_header *);
245         void *recv_context;
246         struct ctdb_req_header *hdr;
247         uint32_t generation;
248         bool ignore_generation;
249 };
250
251 /*
252   called when we should retry the operation
253  */
254 static void lock_fetch_callback(void *p, bool locked)
255 {
256         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
257         if (!state->ignore_generation &&
258             state->generation != state->ctdb->vnn_map->generation) {
259                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
260                 talloc_free(state->hdr);
261                 return;
262         }
263         state->recv_pkt(state->recv_context, state->hdr);
264         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
265 }
266
267
268 /*
269   do a non-blocking ltdb_lock, deferring this ctdb request until we
270   have the chainlock
271
272   It does the following:
273
274    1) tries to get the chainlock. If it succeeds, then it returns 0
275
276    2) if it fails to get a chainlock immediately then it sets up a
277    non-blocking chainlock via ctdb_lock_record, and when it gets the
278    chainlock it re-submits this ctdb request to the main packet
279    receive function.
280
281    This effectively queues all ctdb requests that cannot be
282    immediately satisfied until it can get the lock. This means that
283    the main ctdb daemon will not block waiting for a chainlock held by
284    a client
285
286    There are 3 possible return values:
287
288        0:    means that it got the lock immediately.
289       -1:    means that it failed to get the lock, and won't retry
290       -2:    means that it failed to get the lock immediately, but will retry
291  */
292 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
293                            TDB_DATA key, struct ctdb_req_header *hdr,
294                            void (*recv_pkt)(void *, struct ctdb_req_header *),
295                            void *recv_context, bool ignore_generation)
296 {
297         int ret;
298         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
299         struct lock_request *lreq;
300         struct lock_fetch_state *state;
301         
302         ret = tdb_chainlock_nonblock(tdb, key);
303
304         if (ret != 0 &&
305             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
306                 /* a hard failure - don't try again */
307                 return -1;
308         }
309
310         /* when torturing, ensure we test the contended path */
311         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
312             random() % 5 == 0) {
313                 ret = -1;
314                 tdb_chainunlock(tdb, key);
315         }
316
317         /* first the non-contended path */
318         if (ret == 0) {
319                 return 0;
320         }
321
322         state = talloc(hdr, struct lock_fetch_state);
323         state->ctdb = ctdb_db->ctdb;
324         state->hdr = hdr;
325         state->recv_pkt = recv_pkt;
326         state->recv_context = recv_context;
327         state->generation = ctdb_db->ctdb->vnn_map->generation;
328         state->ignore_generation = ignore_generation;
329
330         /* now the contended path */
331         lreq = ctdb_lock_record(ctdb_db, key, true, lock_fetch_callback, state);
332         if (lreq == NULL) {
333                 return -1;
334         }
335
336         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
337            so it won't be freed yet */
338         talloc_steal(state, hdr);
339
340         /* now tell the caller than we will retry asynchronously */
341         return -2;
342 }
343
344 /*
345   a varient of ctdb_ltdb_lock_requeue that also fetches the record
346  */
347 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
348                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
349                                  struct ctdb_req_header *hdr, TDB_DATA *data,
350                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
351                                  void *recv_context, bool ignore_generation)
352 {
353         int ret;
354
355         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
356                                      recv_context, ignore_generation);
357         if (ret == 0) {
358                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
359                 if (ret != 0) {
360                         int uret;
361                         uret = ctdb_ltdb_unlock(ctdb_db, key);
362                         if (uret != 0) {
363                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
364                         }
365                 }
366         }
367         return ret;
368 }
369
370
371 /*
372   paraoid check to see if the db is empty
373  */
374 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
375 {
376         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
377         int count = tdb_traverse_read(tdb, NULL, NULL);
378         if (count != 0) {
379                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
380                          ctdb_db->db_path));
381                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
382         }
383 }
384
385 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
386                                 struct ctdb_db_context *ctdb_db)
387 {
388         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
389         char *old;
390         char *reason = NULL;
391         TDB_DATA key;
392         TDB_DATA val;
393
394         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
395         key.dsize = strlen(ctdb_db->db_name);
396
397         old = ctdb_db->unhealthy_reason;
398         ctdb_db->unhealthy_reason = NULL;
399
400         val = tdb_fetch(tdb, key);
401         if (val.dsize > 0) {
402                 reason = talloc_strndup(ctdb_db,
403                                         (const char *)val.dptr,
404                                         val.dsize);
405                 if (reason == NULL) {
406                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
407                                            (int)val.dsize));
408                         ctdb_db->unhealthy_reason = old;
409                         free(val.dptr);
410                         return -1;
411                 }
412         }
413
414         if (val.dptr) {
415                 free(val.dptr);
416         }
417
418         talloc_free(old);
419         ctdb_db->unhealthy_reason = reason;
420         return 0;
421 }
422
423 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
424                                   struct ctdb_db_context *ctdb_db,
425                                   const char *given_reason,/* NULL means healthy */
426                                   int num_healthy_nodes)
427 {
428         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
429         int ret;
430         TDB_DATA key;
431         TDB_DATA val;
432         char *new_reason = NULL;
433         char *old_reason = NULL;
434
435         ret = tdb_transaction_start(tdb);
436         if (ret != 0) {
437                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
438                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
439                 return -1;
440         }
441
442         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
443         if (ret != 0) {
444                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
445                                    ctdb_db->db_name, ret));
446                 return -1;
447         }
448         old_reason = ctdb_db->unhealthy_reason;
449
450         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
451         key.dsize = strlen(ctdb_db->db_name);
452
453         if (given_reason) {
454                 new_reason = talloc_strdup(ctdb_db, given_reason);
455                 if (new_reason == NULL) {
456                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
457                                           given_reason));
458                         return -1;
459                 }
460         } else if (old_reason && num_healthy_nodes == 0) {
461                 /*
462                  * If the reason indicates ok, but there where no healthy nodes
463                  * available, that it means, we have not recovered valid content
464                  * of the db. So if there's an old reason, prefix it with
465                  * "NO-HEALTHY-NODES - "
466                  */
467                 const char *prefix;
468
469 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
470                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
471                 if (ret != 0) {
472                         prefix = _TMP_PREFIX;
473                 } else {
474                         prefix = "";
475                 }
476                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
477                                          prefix, old_reason);
478                 if (new_reason == NULL) {
479                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
480                                           prefix, old_reason));
481                         return -1;
482                 }
483 #undef _TMP_PREFIX
484         }
485
486         if (new_reason) {
487                 val.dptr = discard_const_p(uint8_t, new_reason);
488                 val.dsize = strlen(new_reason);
489
490                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
491                 if (ret != 0) {
492                         tdb_transaction_cancel(tdb);
493                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
494                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
495                                            ret, tdb_errorstr(tdb)));
496                         talloc_free(new_reason);
497                         return -1;
498                 }
499                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
500                                    ctdb_db->db_name, new_reason));
501         } else if (old_reason) {
502                 ret = tdb_delete(tdb, key);
503                 if (ret != 0) {
504                         tdb_transaction_cancel(tdb);
505                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
506                                            tdb_name(tdb), ctdb_db->db_name,
507                                            ret, tdb_errorstr(tdb)));
508                         talloc_free(new_reason);
509                         return -1;
510                 }
511                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
512                                    ctdb_db->db_name));
513         }
514
515         ret = tdb_transaction_commit(tdb);
516         if (ret != TDB_SUCCESS) {
517                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
518                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
519                 talloc_free(new_reason);
520                 return -1;
521         }
522
523         talloc_free(old_reason);
524         ctdb_db->unhealthy_reason = new_reason;
525
526         return 0;
527 }
528
529 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
530                                      struct ctdb_db_context *ctdb_db)
531 {
532         time_t now = time(NULL);
533         char *new_path;
534         char *new_reason;
535         int ret;
536         struct tm *tm;
537
538         tm = gmtime(&now);
539
540         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
541         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
542                                    "%04u%02u%02u%02u%02u%02u.0Z",
543                                    ctdb_db->db_path,
544                                    tm->tm_year+1900, tm->tm_mon+1,
545                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
546                                    tm->tm_sec);
547         if (new_path == NULL) {
548                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
549                 return -1;
550         }
551
552         new_reason = talloc_asprintf(ctdb_db,
553                                      "ERROR - Backup of corrupted TDB in '%s'",
554                                      new_path);
555         if (new_reason == NULL) {
556                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
557                 return -1;
558         }
559         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
560         talloc_free(new_reason);
561         if (ret != 0) {
562                 DEBUG(DEBUG_CRIT,(__location__
563                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
564                                  ctdb_db->db_path));
565                 return -1;
566         }
567
568         ret = rename(ctdb_db->db_path, new_path);
569         if (ret != 0) {
570                 DEBUG(DEBUG_CRIT,(__location__
571                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
572                                   ctdb_db->db_path, new_path,
573                                   errno, strerror(errno)));
574                 talloc_free(new_path);
575                 return -1;
576         }
577
578         DEBUG(DEBUG_CRIT,(__location__
579                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
580                          ctdb_db->db_path, new_path));
581         talloc_free(new_path);
582         return 0;
583 }
584
585 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
586 {
587         struct ctdb_db_context *ctdb_db;
588         int ret;
589         int ok = 0;
590         int fail = 0;
591
592         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
593                 if (!ctdb_db->persistent) {
594                         continue;
595                 }
596
597                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
598                 if (ret != 0) {
599                         DEBUG(DEBUG_ALERT,(__location__
600                                            " load persistent health for '%s' failed\n",
601                                            ctdb_db->db_path));
602                         return -1;
603                 }
604
605                 if (ctdb_db->unhealthy_reason == NULL) {
606                         ok++;
607                         DEBUG(DEBUG_INFO,(__location__
608                                    " persistent db '%s' healthy\n",
609                                    ctdb_db->db_path));
610                         continue;
611                 }
612
613                 fail++;
614                 DEBUG(DEBUG_ALERT,(__location__
615                                    " persistent db '%s' unhealthy: %s\n",
616                                    ctdb_db->db_path,
617                                    ctdb_db->unhealthy_reason));
618         }
619         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
620               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
621                ok, fail));
622
623         if (fail != 0) {
624                 return -1;
625         }
626
627         return 0;
628 }
629
630
631 /*
632   mark a database - as healthy
633  */
634 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
635 {
636         uint32_t db_id = *(uint32_t *)indata.dptr;
637         struct ctdb_db_context *ctdb_db;
638         int ret;
639         bool may_recover = false;
640
641         ctdb_db = find_ctdb_db(ctdb, db_id);
642         if (!ctdb_db) {
643                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
644                 return -1;
645         }
646
647         if (ctdb_db->unhealthy_reason) {
648                 may_recover = true;
649         }
650
651         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
652         if (ret != 0) {
653                 DEBUG(DEBUG_ERR,(__location__
654                                  " ctdb_update_persistent_health(%s) failed\n",
655                                  ctdb_db->db_name));
656                 return -1;
657         }
658
659         if (may_recover && !ctdb->done_startup) {
660                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
661                                   ctdb_db->db_name));
662                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
663         }
664
665         return 0;
666 }
667
668 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
669                                    TDB_DATA indata,
670                                    TDB_DATA *outdata)
671 {
672         uint32_t db_id = *(uint32_t *)indata.dptr;
673         struct ctdb_db_context *ctdb_db;
674         int ret;
675
676         ctdb_db = find_ctdb_db(ctdb, db_id);
677         if (!ctdb_db) {
678                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
679                 return -1;
680         }
681
682         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
683         if (ret != 0) {
684                 DEBUG(DEBUG_ERR,(__location__
685                                  " ctdb_load_persistent_health(%s) failed\n",
686                                  ctdb_db->db_name));
687                 return -1;
688         }
689
690         *outdata = tdb_null;
691         if (ctdb_db->unhealthy_reason) {
692                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
693                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
694         }
695
696         return 0;
697 }
698
699
700 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
701 {
702         char *ropath;
703
704         if (ctdb_db->readonly) {
705                 return 0;
706         }
707
708         if (ctdb_db->persistent) {
709                 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
710                 return -1;
711         }
712
713         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
714         if (ropath == NULL) {
715                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
716                 return -1;
717         }
718         ctdb_db->rottdb = tdb_open(ropath, 
719                               ctdb->tunable.database_hash_size, 
720                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
721                               O_CREAT|O_RDWR, 0);
722         if (ctdb_db->rottdb == NULL) {
723                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
724                 talloc_free(ropath);
725                 return -1;
726         }
727
728         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
729
730         ctdb_db->readonly = true;
731         talloc_free(ropath);
732         return 0;
733 }
734
735 /*
736   attach to a database, handling both persistent and non-persistent databases
737   return 0 on success, -1 on failure
738  */
739 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
740                              bool persistent, const char *unhealthy_reason,
741                              bool jenkinshash)
742 {
743         struct ctdb_db_context *ctdb_db, *tmp_db;
744         int ret;
745         struct TDB_DATA key;
746         unsigned tdb_flags;
747         int mode = 0600;
748         int remaining_tries = 0;
749
750         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
751         CTDB_NO_MEMORY(ctdb, ctdb_db);
752
753         ctdb_db->priority = 1;
754         ctdb_db->ctdb = ctdb;
755         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
756         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
757
758         key.dsize = strlen(db_name)+1;
759         key.dptr  = discard_const(db_name);
760         ctdb_db->db_id = ctdb_hash(&key);
761         ctdb_db->persistent = persistent;
762
763         if (!ctdb_db->persistent) {
764                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
765                 if (ctdb_db->delete_queue == NULL) {
766                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
767                 }
768
769                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
770         }
771
772         /* check for hash collisions */
773         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
774                 if (tmp_db->db_id == ctdb_db->db_id) {
775                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
776                                  tmp_db->db_id, db_name, tmp_db->db_name));
777                         talloc_free(ctdb_db);
778                         return -1;
779                 }
780         }
781
782         if (persistent) {
783                 if (unhealthy_reason) {
784                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
785                                                             unhealthy_reason, 0);
786                         if (ret != 0) {
787                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
788                                                    ctdb_db->db_name, unhealthy_reason, ret));
789                                 talloc_free(ctdb_db);
790                                 return -1;
791                         }
792                 }
793
794                 if (ctdb->max_persistent_check_errors > 0) {
795                         remaining_tries = 1;
796                 }
797                 if (ctdb->done_startup) {
798                         remaining_tries = 0;
799                 }
800
801                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
802                 if (ret != 0) {
803                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
804                                    ctdb_db->db_name, ret));
805                         talloc_free(ctdb_db);
806                         return -1;
807                 }
808         }
809
810         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
811                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
812                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
813                 talloc_free(ctdb_db);
814                 return -1;
815         }
816
817         if (ctdb_db->unhealthy_reason) {
818                 /* this is just a warning, but we want that in the log file! */
819                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
820                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
821         }
822
823         /* open the database */
824         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
825                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
826                                            db_name, ctdb->pnn);
827
828         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
829         if (ctdb->valgrinding) {
830                 tdb_flags |= TDB_NOMMAP;
831         }
832         tdb_flags |= TDB_DISALLOW_NESTING;
833         if (jenkinshash) {
834                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
835         }
836
837 again:
838         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
839                                       ctdb->tunable.database_hash_size, 
840                                       tdb_flags, 
841                                       O_CREAT|O_RDWR, mode);
842         if (ctdb_db->ltdb == NULL) {
843                 struct stat st;
844                 int saved_errno = errno;
845
846                 if (!persistent) {
847                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
848                                           ctdb_db->db_path,
849                                           saved_errno,
850                                           strerror(saved_errno)));
851                         talloc_free(ctdb_db);
852                         return -1;
853                 }
854
855                 if (remaining_tries == 0) {
856                         DEBUG(DEBUG_CRIT,(__location__
857                                           "Failed to open persistent tdb '%s': %d - %s\n",
858                                           ctdb_db->db_path,
859                                           saved_errno,
860                                           strerror(saved_errno)));
861                         talloc_free(ctdb_db);
862                         return -1;
863                 }
864
865                 ret = stat(ctdb_db->db_path, &st);
866                 if (ret != 0) {
867                         DEBUG(DEBUG_CRIT,(__location__
868                                           "Failed to open persistent tdb '%s': %d - %s\n",
869                                           ctdb_db->db_path,
870                                           saved_errno,
871                                           strerror(saved_errno)));
872                         talloc_free(ctdb_db);
873                         return -1;
874                 }
875
876                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
877                 if (ret != 0) {
878                         DEBUG(DEBUG_CRIT,(__location__
879                                           "Failed to open persistent tdb '%s': %d - %s\n",
880                                           ctdb_db->db_path,
881                                           saved_errno,
882                                           strerror(saved_errno)));
883                         talloc_free(ctdb_db);
884                         return -1;
885                 }
886
887                 remaining_tries--;
888                 mode = st.st_mode;
889                 goto again;
890         }
891
892         if (!persistent) {
893                 ctdb_check_db_empty(ctdb_db);
894         } else {
895                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
896                 if (ret != 0) {
897                         int fd;
898                         struct stat st;
899
900                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
901                                           ctdb_db->db_path, ret,
902                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
903                         if (remaining_tries == 0) {
904                                 talloc_free(ctdb_db);
905                                 return -1;
906                         }
907
908                         fd = tdb_fd(ctdb_db->ltdb->tdb);
909                         ret = fstat(fd, &st);
910                         if (ret != 0) {
911                                 DEBUG(DEBUG_CRIT,(__location__
912                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
913                                                   ctdb_db->db_path,
914                                                   errno,
915                                                   strerror(errno)));
916                                 talloc_free(ctdb_db);
917                                 return -1;
918                         }
919
920                         /* close the TDB */
921                         talloc_free(ctdb_db->ltdb);
922                         ctdb_db->ltdb = NULL;
923
924                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
925                         if (ret != 0) {
926                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
927                                                   ctdb_db->db_path));
928                                 talloc_free(ctdb_db);
929                                 return -1;
930                         }
931
932                         remaining_tries--;
933                         mode = st.st_mode;
934                         goto again;
935                 }
936         }
937
938         /* set up a rb tree we can use to track which records we have a 
939            fetch-lock in-flight for so we can defer any additional calls
940            for the same record.
941          */
942         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
943         if (ctdb_db->deferred_fetch == NULL) {
944                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
945                 talloc_free(ctdb_db);
946                 return -1;
947         }
948
949         DLIST_ADD(ctdb->db_list, ctdb_db);
950
951         /* setting this can help some high churn databases */
952         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
953
954         /* 
955            all databases support the "null" function. we need this in
956            order to do forced migration of records
957         */
958         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
959         if (ret != 0) {
960                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
961                 talloc_free(ctdb_db);
962                 return -1;
963         }
964
965         /* 
966            all databases support the "fetch" function. we need this
967            for efficient Samba3 ctdb fetch
968         */
969         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
970         if (ret != 0) {
971                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
972                 talloc_free(ctdb_db);
973                 return -1;
974         }
975
976         /* 
977            all databases support the "fetch_with_header" function. we need this
978            for efficient readonly record fetches
979         */
980         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
981         if (ret != 0) {
982                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
983                 talloc_free(ctdb_db);
984                 return -1;
985         }
986
987         ret = ctdb_vacuum_init(ctdb_db);
988         if (ret != 0) {
989                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
990                                   "database '%s'\n", ctdb_db->db_name));
991                 talloc_free(ctdb_db);
992                 return -1;
993         }
994
995
996         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
997         
998         /* success */
999         return 0;
1000 }
1001
1002
1003 struct ctdb_deferred_attach_context {
1004         struct ctdb_deferred_attach_context *next, *prev;
1005         struct ctdb_context *ctdb;
1006         struct ctdb_req_control *c;
1007 };
1008
1009
1010 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1011 {
1012         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1013
1014         return 0;
1015 }
1016
1017 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1018 {
1019         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1020         struct ctdb_context *ctdb = da_ctx->ctdb;
1021
1022         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1023         talloc_free(da_ctx);
1024 }
1025
1026 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1027 {
1028         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1029         struct ctdb_context *ctdb = da_ctx->ctdb;
1030
1031         /* This talloc-steals the packet ->c */
1032         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1033         talloc_free(da_ctx);
1034 }
1035
1036 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1037 {
1038         struct ctdb_deferred_attach_context *da_ctx;
1039
1040         /* call it from the main event loop as soon as the current event 
1041            finishes.
1042          */
1043         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1044                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1045                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1046         }
1047
1048         return 0;
1049 }
1050
1051 /*
1052   a client has asked to attach a new database
1053  */
1054 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1055                                TDB_DATA *outdata, uint64_t tdb_flags, 
1056                                bool persistent, uint32_t client_id,
1057                                struct ctdb_req_control *c,
1058                                bool *async_reply)
1059 {
1060         const char *db_name = (const char *)indata.dptr;
1061         struct ctdb_db_context *db;
1062         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1063         struct ctdb_client *client = NULL;
1064
1065         if (ctdb->tunable.allow_client_db_attach == 0) {
1066                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1067                                   "AllowClientDBAccess == 0\n", db_name));
1068                 return -1;
1069         }
1070
1071         /* dont allow any local clients to attach while we are in recovery mode
1072          * except for the recovery daemon.
1073          * allow all attach from the network since these are always from remote
1074          * recovery daemons.
1075          */
1076         if (client_id != 0) {
1077                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1078         }
1079         if (client != NULL) {
1080                 /* If the node is inactive it is not part of the cluster
1081                    and we should not allow clients to attach to any
1082                    databases
1083                 */
1084                 if (node->flags & NODE_FLAGS_INACTIVE) {
1085                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1086                         return -1;
1087                 }
1088
1089                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1090                  && client->pid != ctdb->recoverd_pid
1091                  && !ctdb->done_startup) {
1092                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1093
1094                         if (da_ctx == NULL) {
1095                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1096                                 return -1;
1097                         }
1098
1099                         da_ctx->ctdb = ctdb;
1100                         da_ctx->c = talloc_steal(da_ctx, c);
1101                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1102                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1103
1104                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1105
1106                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1107                         *async_reply = true;
1108                         return 0;
1109                 }
1110         }
1111
1112         /* the client can optionally pass additional tdb flags, but we
1113            only allow a subset of those on the database in ctdb. Note
1114            that tdb_flags is passed in via the (otherwise unused)
1115            srvid to the attach control */
1116         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1117
1118         /* see if we already have this name */
1119         db = ctdb_db_handle(ctdb, db_name);
1120         if (db) {
1121                 if (db->persistent != persistent) {
1122                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1123                                           "database %s\n", persistent ? "" : "non-",
1124                                           db-> persistent ? "" : "non-", db_name));
1125                         return -1;
1126                 }
1127                 outdata->dptr  = (uint8_t *)&db->db_id;
1128                 outdata->dsize = sizeof(db->db_id);
1129                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1130                 return 0;
1131         }
1132
1133         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1134                 return -1;
1135         }
1136
1137         db = ctdb_db_handle(ctdb, db_name);
1138         if (!db) {
1139                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1140                 return -1;
1141         }
1142
1143         /* remember the flags the client has specified */
1144         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1145
1146         outdata->dptr  = (uint8_t *)&db->db_id;
1147         outdata->dsize = sizeof(db->db_id);
1148
1149         /* Try to ensure it's locked in mem */
1150         ctdb_lockdown_memory(ctdb);
1151
1152         /* tell all the other nodes about this database */
1153         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1154                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1155                                                 CTDB_CONTROL_DB_ATTACH,
1156                                  0, CTDB_CTRL_FLAG_NOREPLY,
1157                                  indata, NULL, NULL);
1158
1159         /* success */
1160         return 0;
1161 }
1162
1163
1164 /*
1165   attach to all existing persistent databases
1166  */
1167 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1168                                   const char *unhealthy_reason)
1169 {
1170         DIR *d;
1171         struct dirent *de;
1172
1173         /* open the persistent db directory and scan it for files */
1174         d = opendir(ctdb->db_directory_persistent);
1175         if (d == NULL) {
1176                 return 0;
1177         }
1178
1179         while ((de=readdir(d))) {
1180                 char *p, *s, *q;
1181                 size_t len = strlen(de->d_name);
1182                 uint32_t node;
1183                 int invalid_name = 0;
1184                 
1185                 s = talloc_strdup(ctdb, de->d_name);
1186                 CTDB_NO_MEMORY(ctdb, s);
1187
1188                 /* only accept names ending in .tdb */
1189                 p = strstr(s, ".tdb.");
1190                 if (len < 7 || p == NULL) {
1191                         talloc_free(s);
1192                         continue;
1193                 }
1194
1195                 /* only accept names ending with .tdb. and any number of digits */
1196                 q = p+5;
1197                 while (*q != 0 && invalid_name == 0) {
1198                         if (!isdigit(*q++)) {
1199                                 invalid_name = 1;
1200                         }
1201                 }
1202                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1203                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1204                         talloc_free(s);
1205                         continue;
1206                 }
1207                 p[4] = 0;
1208
1209                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1210                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1211                         closedir(d);
1212                         talloc_free(s);
1213                         return -1;
1214                 }
1215
1216                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1217
1218                 talloc_free(s);
1219         }
1220         closedir(d);
1221         return 0;
1222 }
1223
1224 int ctdb_attach_databases(struct ctdb_context *ctdb)
1225 {
1226         int ret;
1227         char *persistent_health_path = NULL;
1228         char *unhealthy_reason = NULL;
1229         bool first_try = true;
1230
1231         if (ctdb->db_directory == NULL) {
1232                 ctdb->db_directory = VARDIR "/ctdb";
1233         }
1234         if (ctdb->db_directory_persistent == NULL) {
1235                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1236         }
1237         if (ctdb->db_directory_state == NULL) {
1238                 ctdb->db_directory_state = VARDIR "/ctdb/state";
1239         }
1240
1241         /* make sure the db directory exists */
1242         ret = mkdir(ctdb->db_directory, 0700);
1243         if (ret == -1 && errno != EEXIST) {
1244                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1245                          ctdb->db_directory));
1246                 return -1;
1247         }
1248
1249         /* make sure the persistent db directory exists */
1250         ret = mkdir(ctdb->db_directory_persistent, 0700);
1251         if (ret == -1 && errno != EEXIST) {
1252                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1253                          ctdb->db_directory_persistent));
1254                 return -1;
1255         }
1256
1257         /* make sure the internal state db directory exists */
1258         ret = mkdir(ctdb->db_directory_state, 0700);
1259         if (ret == -1 && errno != EEXIST) {
1260                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1261                          ctdb->db_directory_state));
1262                 return -1;
1263         }
1264
1265         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1266                                                  ctdb->db_directory_state,
1267                                                  PERSISTENT_HEALTH_TDB,
1268                                                  ctdb->pnn);
1269         if (persistent_health_path == NULL) {
1270                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1271                 return -1;
1272         }
1273
1274 again:
1275
1276         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1277                                                    0, TDB_DISALLOW_NESTING,
1278                                                    O_CREAT | O_RDWR, 0600);
1279         if (ctdb->db_persistent_health == NULL) {
1280                 struct tdb_wrap *tdb;
1281
1282                 if (!first_try) {
1283                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1284                                           persistent_health_path,
1285                                           errno,
1286                                           strerror(errno)));
1287                         talloc_free(persistent_health_path);
1288                         talloc_free(unhealthy_reason);
1289                         return -1;
1290                 }
1291                 first_try = false;
1292
1293                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1294                                                    persistent_health_path,
1295                                                    "was cleared after a failure",
1296                                                    "manual verification needed");
1297                 if (unhealthy_reason == NULL) {
1298                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1299                         talloc_free(persistent_health_path);
1300                         return -1;
1301                 }
1302
1303                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1304                                   persistent_health_path));
1305                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1306                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1307                                     O_CREAT | O_RDWR, 0600);
1308                 if (tdb) {
1309                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1310                                           persistent_health_path,
1311                                           errno,
1312                                           strerror(errno)));
1313                         talloc_free(persistent_health_path);
1314                         talloc_free(unhealthy_reason);
1315                         return -1;
1316                 }
1317
1318                 talloc_free(tdb);
1319                 goto again;
1320         }
1321         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1322         if (ret != 0) {
1323                 struct tdb_wrap *tdb;
1324
1325                 talloc_free(ctdb->db_persistent_health);
1326                 ctdb->db_persistent_health = NULL;
1327
1328                 if (!first_try) {
1329                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1330                                           persistent_health_path));
1331                         talloc_free(persistent_health_path);
1332                         talloc_free(unhealthy_reason);
1333                         return -1;
1334                 }
1335                 first_try = false;
1336
1337                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1338                                                    persistent_health_path,
1339                                                    "was cleared after a failure",
1340                                                    "manual verification needed");
1341                 if (unhealthy_reason == NULL) {
1342                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1343                         talloc_free(persistent_health_path);
1344                         return -1;
1345                 }
1346
1347                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1348                                   persistent_health_path));
1349                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1350                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1351                                     O_CREAT | O_RDWR, 0600);
1352                 if (tdb) {
1353                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1354                                           persistent_health_path,
1355                                           errno,
1356                                           strerror(errno)));
1357                         talloc_free(persistent_health_path);
1358                         talloc_free(unhealthy_reason);
1359                         return -1;
1360                 }
1361
1362                 talloc_free(tdb);
1363                 goto again;
1364         }
1365         talloc_free(persistent_health_path);
1366
1367         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1368         talloc_free(unhealthy_reason);
1369         if (ret != 0) {
1370                 return ret;
1371         }
1372
1373         return 0;
1374 }
1375
1376 /*
1377   called when a broadcast seqnum update comes in
1378  */
1379 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1380 {
1381         struct ctdb_db_context *ctdb_db;
1382         if (srcnode == ctdb->pnn) {
1383                 /* don't update ourselves! */
1384                 return 0;
1385         }
1386
1387         ctdb_db = find_ctdb_db(ctdb, db_id);
1388         if (!ctdb_db) {
1389                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1390                 return -1;
1391         }
1392
1393         if (ctdb_db->unhealthy_reason) {
1394                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1395                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1396                 return -1;
1397         }
1398
1399         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1400         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1401         return 0;
1402 }
1403
1404 /*
1405   timer to check for seqnum changes in a ltdb and propogate them
1406  */
1407 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1408                                    struct timeval t, void *p)
1409 {
1410         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1411         struct ctdb_context *ctdb = ctdb_db->ctdb;
1412         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1413         if (new_seqnum != ctdb_db->seqnum) {
1414                 /* something has changed - propogate it */
1415                 TDB_DATA data;
1416                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1417                 data.dsize = sizeof(uint32_t);
1418                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1419                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1420                                          data, NULL, NULL);             
1421         }
1422         ctdb_db->seqnum = new_seqnum;
1423
1424         /* setup a new timer */
1425         ctdb_db->seqnum_update =
1426                 event_add_timed(ctdb->ev, ctdb_db, 
1427                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1428                                 ctdb_ltdb_seqnum_check, ctdb_db);
1429 }
1430
1431 /*
1432   enable seqnum handling on this db
1433  */
1434 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1435 {
1436         struct ctdb_db_context *ctdb_db;
1437         ctdb_db = find_ctdb_db(ctdb, db_id);
1438         if (!ctdb_db) {
1439                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1440                 return -1;
1441         }
1442
1443         if (ctdb_db->seqnum_update == NULL) {
1444                 ctdb_db->seqnum_update =
1445                         event_add_timed(ctdb->ev, ctdb_db, 
1446                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1447                                         ctdb_ltdb_seqnum_check, ctdb_db);
1448         }
1449
1450         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1451         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1452         return 0;
1453 }
1454
1455 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1456 {
1457         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1458         struct ctdb_db_context *ctdb_db;
1459
1460         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1461         if (!ctdb_db) {
1462                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1463                 return 0;
1464         }
1465
1466         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1467                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1468                 return 0;
1469         }
1470
1471         ctdb_db->priority = db_prio->priority;
1472         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1473
1474         return 0;
1475 }
1476
1477
1478 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1479 {
1480
1481         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1482
1483         if (ctdb_db->sticky) {
1484                 return 0;
1485         }
1486
1487         if (ctdb_db->persistent) {
1488                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1489                 return -1;
1490         }
1491
1492         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1493
1494         ctdb_db->sticky = true;
1495
1496         return 0;
1497 }
1498
1499 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1500                                 uint32_t db_id,
1501                                 TDB_DATA *outdata)
1502 {
1503         struct ctdb_db_context *ctdb_db;
1504         struct ctdb_db_statistics_wire *stats;
1505         int i;
1506         int len;
1507         char *ptr;
1508
1509         ctdb_db = find_ctdb_db(ctdb, db_id);
1510         if (!ctdb_db) {
1511                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1512                 return -1;
1513         }
1514
1515         len = offsetof(struct ctdb_db_statistics_wire, hot_keys);
1516         for (i = 0; i < MAX_HOT_KEYS; i++) {
1517                 len += 8 + ctdb_db->statistics.hot_keys[i].key.dsize;
1518         }
1519
1520         stats = talloc_size(outdata, len);
1521         if (stats == NULL) {
1522                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics wire structure\n"));
1523                 return -1;
1524         }
1525
1526         stats->db_ro_delegations = ctdb_db->statistics.db_ro_delegations;
1527         stats->db_ro_revokes     = ctdb_db->statistics.db_ro_revokes;
1528         for (i = 0; i < MAX_COUNT_BUCKETS; i++) {
1529                 stats->hop_count_bucket[i] = ctdb_db->statistics.hop_count_bucket[i];
1530         }
1531         stats->num_hot_keys = MAX_HOT_KEYS;
1532
1533         ptr = &stats->hot_keys[0];
1534         for (i = 0; i < MAX_HOT_KEYS; i++) {
1535                 *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].count;
1536                 ptr += 4;
1537
1538                 *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].key.dsize;
1539                 ptr += 4;
1540
1541                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr, ctdb_db->statistics.hot_keys[i].key.dsize);
1542                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1543         }
1544
1545         outdata->dptr  = (uint8_t *)stats;
1546         outdata->dsize = len;
1547
1548         return 0;
1549 }