ctdbd: Replace lockwait with locking API and remove ctdb_lockwait.c
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tdb/include/tdb.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
28 #include "db_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /**
35  * write a record to a normal database
36  *
37  * This is the server-variant of the ctdb_ltdb_store function.
38  * It contains logic to determine whether a record should be
39  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
40  * controls to the local ctdb daemon if apporpriate.
41  */
42 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
43                                   TDB_DATA key,
44                                   struct ctdb_ltdb_header *header,
45                                   TDB_DATA data)
46 {
47         struct ctdb_context *ctdb = ctdb_db->ctdb;
48         TDB_DATA rec;
49         int ret;
50         bool seqnum_suppressed = false;
51         bool keep = false;
52         bool schedule_for_deletion = false;
53         bool remove_from_delete_queue = false;
54         uint32_t lmaster;
55
56         if (ctdb->flags & CTDB_FLAG_TORTURE) {
57                 struct ctdb_ltdb_header *h2;
58                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
59                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
60                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
61                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
62                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
63                 }
64                 if (rec.dptr) free(rec.dptr);
65         }
66
67         if (ctdb->vnn_map == NULL) {
68                 /*
69                  * Called from a client: always store the record
70                  * Also don't call ctdb_lmaster since it uses the vnn_map!
71                  */
72                 keep = true;
73                 goto store;
74         }
75
76         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
77
78         /*
79          * If we migrate an empty record off to another node
80          * and the record has not been migrated with data,
81          * delete the record instead of storing the empty record.
82          */
83         if (data.dsize != 0) {
84                 keep = true;
85         } else if (header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)) {
86                 keep = true;
87         } else if (ctdb_db->persistent) {
88                 keep = true;
89         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
90                 /*
91                  * The record is not created by the client but
92                  * automatically by the ctdb_ltdb_fetch logic that
93                  * creates a record with an initial header in the
94                  * ltdb before trying to migrate the record from
95                  * the current lmaster. Keep it instead of trying
96                  * to delete the non-existing record...
97                  */
98                 keep = true;
99                 schedule_for_deletion = true;
100         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
101                 keep = true;
102         } else if (ctdb_db->ctdb->pnn == lmaster) {
103                 /*
104                  * If we are lmaster, then we usually keep the record.
105                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
106                  * and the record is empty and has never been migrated
107                  * with data, then we should delete it instead of storing it.
108                  * This is part of the vacuuming process.
109                  *
110                  * The reason that we usually need to store even empty records
111                  * on the lmaster is that a client operating directly on the
112                  * lmaster (== dmaster) expects the local copy of the record to
113                  * exist after successful ctdb migrate call. If the record does
114                  * not exist, the client goes into a migrate loop and eventually
115                  * fails. So storing the empty record makes sure that we do not
116                  * need to change the client code.
117                  */
118                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
119                         keep = true;
120                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
121                         keep = true;
122                 }
123         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
124                 keep = true;
125         }
126
127         if (keep) {
128                 if ((data.dsize == 0) &&
129                     !ctdb_db->persistent &&
130                     (ctdb_db->ctdb->pnn == header->dmaster) &&
131                     !(header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)))
132                 {
133                         schedule_for_deletion = true;
134                 }
135                 remove_from_delete_queue = !schedule_for_deletion;
136         }
137
138 store:
139         /*
140          * The VACUUM_MIGRATED flag is only set temporarily for
141          * the above logic when the record was retrieved by a
142          * VACUUM_MIGRATE call and should not be stored in the
143          * database.
144          *
145          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
146          * and there are two cases in which the corresponding record
147          * is stored in the local database:
148          * 1. The record has been migrated with data in the past
149          *    (the MIGRATED_WITH_DATA record flag is set).
150          * 2. The record has been filled with data again since it
151          *    had been submitted in the VACUUM_FETCH message to the
152          *    lmaster.
153          * For such records it is important to not store the
154          * VACUUM_MIGRATED flag in the database.
155          */
156         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
157
158         /*
159          * Similarly, clear the AUTOMATIC flag which should not enter
160          * the local database copy since this would require client
161          * modifications to clear the flag when the client stores
162          * the record.
163          */
164         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
165
166         rec.dsize = sizeof(*header) + data.dsize;
167         rec.dptr = talloc_size(ctdb, rec.dsize);
168         CTDB_NO_MEMORY(ctdb, rec.dptr);
169
170         memcpy(rec.dptr, header, sizeof(*header));
171         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
172
173         /* Databases with seqnum updates enabled only get their seqnum
174            changes when/if we modify the data */
175         if (ctdb_db->seqnum_update != NULL) {
176                 TDB_DATA old;
177                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
178
179                 if ( (old.dsize == rec.dsize)
180                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
181                           rec.dptr+sizeof(struct ctdb_ltdb_header),
182                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
183                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
184                         seqnum_suppressed = true;
185                 }
186                 if (old.dptr) free(old.dptr);
187         }
188
189         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
190                             ctdb_db->db_name,
191                             keep?"storing":"deleting",
192                             ctdb_hash(&key)));
193
194         if (keep) {
195                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
196         } else {
197                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
198         }
199
200         if (ret != 0) {
201                 int lvl = DEBUG_ERR;
202
203                 if (keep == false &&
204                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
205                 {
206                         lvl = DEBUG_DEBUG;
207                 }
208
209                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
210                             "%d - %s\n",
211                             ctdb_db->db_name,
212                             keep?"store":"delete", ret,
213                             tdb_errorstr(ctdb_db->ltdb->tdb)));
214
215                 schedule_for_deletion = false;
216                 remove_from_delete_queue = false;
217         }
218         if (seqnum_suppressed) {
219                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
220         }
221
222         talloc_free(rec.dptr);
223
224         if (schedule_for_deletion) {
225                 int ret2;
226                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
227                 if (ret2 != 0) {
228                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
229                 }
230         }
231
232         if (remove_from_delete_queue) {
233                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
234         }
235
236         return ret;
237 }
238
239 struct lock_fetch_state {
240         struct ctdb_context *ctdb;
241         void (*recv_pkt)(void *, struct ctdb_req_header *);
242         void *recv_context;
243         struct ctdb_req_header *hdr;
244         uint32_t generation;
245         bool ignore_generation;
246 };
247
248 /*
249   called when we should retry the operation
250  */
251 static void lock_fetch_callback(void *p, bool locked)
252 {
253         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
254         if (!state->ignore_generation &&
255             state->generation != state->ctdb->vnn_map->generation) {
256                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
257                 talloc_free(state->hdr);
258                 return;
259         }
260         state->recv_pkt(state->recv_context, state->hdr);
261         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
262 }
263
264
265 /*
266   do a non-blocking ltdb_lock, deferring this ctdb request until we
267   have the chainlock
268
269   It does the following:
270
271    1) tries to get the chainlock. If it succeeds, then it returns 0
272
273    2) if it fails to get a chainlock immediately then it sets up a
274    non-blocking chainlock via ctdb_lock_record, and when it gets the
275    chainlock it re-submits this ctdb request to the main packet
276    receive function.
277
278    This effectively queues all ctdb requests that cannot be
279    immediately satisfied until it can get the lock. This means that
280    the main ctdb daemon will not block waiting for a chainlock held by
281    a client
282
283    There are 3 possible return values:
284
285        0:    means that it got the lock immediately.
286       -1:    means that it failed to get the lock, and won't retry
287       -2:    means that it failed to get the lock immediately, but will retry
288  */
289 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
290                            TDB_DATA key, struct ctdb_req_header *hdr,
291                            void (*recv_pkt)(void *, struct ctdb_req_header *),
292                            void *recv_context, bool ignore_generation)
293 {
294         int ret;
295         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
296         struct lock_request *lreq;
297         struct lock_fetch_state *state;
298         
299         ret = tdb_chainlock_nonblock(tdb, key);
300
301         if (ret != 0 &&
302             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
303                 /* a hard failure - don't try again */
304                 return -1;
305         }
306
307         /* when torturing, ensure we test the contended path */
308         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
309             random() % 5 == 0) {
310                 ret = -1;
311                 tdb_chainunlock(tdb, key);
312         }
313
314         /* first the non-contended path */
315         if (ret == 0) {
316                 return 0;
317         }
318
319         state = talloc(hdr, struct lock_fetch_state);
320         state->ctdb = ctdb_db->ctdb;
321         state->hdr = hdr;
322         state->recv_pkt = recv_pkt;
323         state->recv_context = recv_context;
324         state->generation = ctdb_db->ctdb->vnn_map->generation;
325         state->ignore_generation = ignore_generation;
326
327         /* now the contended path */
328         lreq = ctdb_lock_record(ctdb_db, key, true, lock_fetch_callback, state);
329         if (lreq == NULL) {
330                 return -1;
331         }
332
333         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
334            so it won't be freed yet */
335         talloc_steal(state, hdr);
336
337         /* now tell the caller than we will retry asynchronously */
338         return -2;
339 }
340
341 /*
342   a varient of ctdb_ltdb_lock_requeue that also fetches the record
343  */
344 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
345                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
346                                  struct ctdb_req_header *hdr, TDB_DATA *data,
347                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
348                                  void *recv_context, bool ignore_generation)
349 {
350         int ret;
351
352         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
353                                      recv_context, ignore_generation);
354         if (ret == 0) {
355                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
356                 if (ret != 0) {
357                         int uret;
358                         uret = ctdb_ltdb_unlock(ctdb_db, key);
359                         if (uret != 0) {
360                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
361                         }
362                 }
363         }
364         return ret;
365 }
366
367
368 /*
369   paraoid check to see if the db is empty
370  */
371 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
372 {
373         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
374         int count = tdb_traverse_read(tdb, NULL, NULL);
375         if (count != 0) {
376                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
377                          ctdb_db->db_path));
378                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
379         }
380 }
381
382 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
383                                 struct ctdb_db_context *ctdb_db)
384 {
385         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
386         char *old;
387         char *reason = NULL;
388         TDB_DATA key;
389         TDB_DATA val;
390
391         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
392         key.dsize = strlen(ctdb_db->db_name);
393
394         old = ctdb_db->unhealthy_reason;
395         ctdb_db->unhealthy_reason = NULL;
396
397         val = tdb_fetch(tdb, key);
398         if (val.dsize > 0) {
399                 reason = talloc_strndup(ctdb_db,
400                                         (const char *)val.dptr,
401                                         val.dsize);
402                 if (reason == NULL) {
403                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
404                                            (int)val.dsize));
405                         ctdb_db->unhealthy_reason = old;
406                         free(val.dptr);
407                         return -1;
408                 }
409         }
410
411         if (val.dptr) {
412                 free(val.dptr);
413         }
414
415         talloc_free(old);
416         ctdb_db->unhealthy_reason = reason;
417         return 0;
418 }
419
420 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
421                                   struct ctdb_db_context *ctdb_db,
422                                   const char *given_reason,/* NULL means healthy */
423                                   int num_healthy_nodes)
424 {
425         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
426         int ret;
427         TDB_DATA key;
428         TDB_DATA val;
429         char *new_reason = NULL;
430         char *old_reason = NULL;
431
432         ret = tdb_transaction_start(tdb);
433         if (ret != 0) {
434                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
435                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
436                 return -1;
437         }
438
439         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
440         if (ret != 0) {
441                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
442                                    ctdb_db->db_name, ret));
443                 return -1;
444         }
445         old_reason = ctdb_db->unhealthy_reason;
446
447         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
448         key.dsize = strlen(ctdb_db->db_name);
449
450         if (given_reason) {
451                 new_reason = talloc_strdup(ctdb_db, given_reason);
452                 if (new_reason == NULL) {
453                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
454                                           given_reason));
455                         return -1;
456                 }
457         } else if (old_reason && num_healthy_nodes == 0) {
458                 /*
459                  * If the reason indicates ok, but there where no healthy nodes
460                  * available, that it means, we have not recovered valid content
461                  * of the db. So if there's an old reason, prefix it with
462                  * "NO-HEALTHY-NODES - "
463                  */
464                 const char *prefix;
465
466 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
467                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
468                 if (ret != 0) {
469                         prefix = _TMP_PREFIX;
470                 } else {
471                         prefix = "";
472                 }
473                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
474                                          prefix, old_reason);
475                 if (new_reason == NULL) {
476                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
477                                           prefix, old_reason));
478                         return -1;
479                 }
480 #undef _TMP_PREFIX
481         }
482
483         if (new_reason) {
484                 val.dptr = discard_const_p(uint8_t, new_reason);
485                 val.dsize = strlen(new_reason);
486
487                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
488                 if (ret != 0) {
489                         tdb_transaction_cancel(tdb);
490                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
491                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
492                                            ret, tdb_errorstr(tdb)));
493                         talloc_free(new_reason);
494                         return -1;
495                 }
496                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
497                                    ctdb_db->db_name, new_reason));
498         } else if (old_reason) {
499                 ret = tdb_delete(tdb, key);
500                 if (ret != 0) {
501                         tdb_transaction_cancel(tdb);
502                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
503                                            tdb_name(tdb), ctdb_db->db_name,
504                                            ret, tdb_errorstr(tdb)));
505                         talloc_free(new_reason);
506                         return -1;
507                 }
508                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
509                                    ctdb_db->db_name));
510         }
511
512         ret = tdb_transaction_commit(tdb);
513         if (ret != TDB_SUCCESS) {
514                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
515                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
516                 talloc_free(new_reason);
517                 return -1;
518         }
519
520         talloc_free(old_reason);
521         ctdb_db->unhealthy_reason = new_reason;
522
523         return 0;
524 }
525
526 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
527                                      struct ctdb_db_context *ctdb_db)
528 {
529         time_t now = time(NULL);
530         char *new_path;
531         char *new_reason;
532         int ret;
533         struct tm *tm;
534
535         tm = gmtime(&now);
536
537         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
538         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
539                                    "%04u%02u%02u%02u%02u%02u.0Z",
540                                    ctdb_db->db_path,
541                                    tm->tm_year+1900, tm->tm_mon+1,
542                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
543                                    tm->tm_sec);
544         if (new_path == NULL) {
545                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
546                 return -1;
547         }
548
549         new_reason = talloc_asprintf(ctdb_db,
550                                      "ERROR - Backup of corrupted TDB in '%s'",
551                                      new_path);
552         if (new_reason == NULL) {
553                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
554                 return -1;
555         }
556         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
557         talloc_free(new_reason);
558         if (ret != 0) {
559                 DEBUG(DEBUG_CRIT,(__location__
560                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
561                                  ctdb_db->db_path));
562                 return -1;
563         }
564
565         ret = rename(ctdb_db->db_path, new_path);
566         if (ret != 0) {
567                 DEBUG(DEBUG_CRIT,(__location__
568                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
569                                   ctdb_db->db_path, new_path,
570                                   errno, strerror(errno)));
571                 talloc_free(new_path);
572                 return -1;
573         }
574
575         DEBUG(DEBUG_CRIT,(__location__
576                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
577                          ctdb_db->db_path, new_path));
578         talloc_free(new_path);
579         return 0;
580 }
581
582 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
583 {
584         struct ctdb_db_context *ctdb_db;
585         int ret;
586         int ok = 0;
587         int fail = 0;
588
589         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
590                 if (!ctdb_db->persistent) {
591                         continue;
592                 }
593
594                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
595                 if (ret != 0) {
596                         DEBUG(DEBUG_ALERT,(__location__
597                                            " load persistent health for '%s' failed\n",
598                                            ctdb_db->db_path));
599                         return -1;
600                 }
601
602                 if (ctdb_db->unhealthy_reason == NULL) {
603                         ok++;
604                         DEBUG(DEBUG_INFO,(__location__
605                                    " persistent db '%s' healthy\n",
606                                    ctdb_db->db_path));
607                         continue;
608                 }
609
610                 fail++;
611                 DEBUG(DEBUG_ALERT,(__location__
612                                    " persistent db '%s' unhealthy: %s\n",
613                                    ctdb_db->db_path,
614                                    ctdb_db->unhealthy_reason));
615         }
616         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
617               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
618                ok, fail));
619
620         if (fail != 0) {
621                 return -1;
622         }
623
624         return 0;
625 }
626
627
628 /*
629   mark a database - as healthy
630  */
631 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
632 {
633         uint32_t db_id = *(uint32_t *)indata.dptr;
634         struct ctdb_db_context *ctdb_db;
635         int ret;
636         bool may_recover = false;
637
638         ctdb_db = find_ctdb_db(ctdb, db_id);
639         if (!ctdb_db) {
640                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
641                 return -1;
642         }
643
644         if (ctdb_db->unhealthy_reason) {
645                 may_recover = true;
646         }
647
648         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
649         if (ret != 0) {
650                 DEBUG(DEBUG_ERR,(__location__
651                                  " ctdb_update_persistent_health(%s) failed\n",
652                                  ctdb_db->db_name));
653                 return -1;
654         }
655
656         if (may_recover && !ctdb->done_startup) {
657                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
658                                   ctdb_db->db_name));
659                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
660         }
661
662         return 0;
663 }
664
665 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
666                                    TDB_DATA indata,
667                                    TDB_DATA *outdata)
668 {
669         uint32_t db_id = *(uint32_t *)indata.dptr;
670         struct ctdb_db_context *ctdb_db;
671         int ret;
672
673         ctdb_db = find_ctdb_db(ctdb, db_id);
674         if (!ctdb_db) {
675                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
676                 return -1;
677         }
678
679         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
680         if (ret != 0) {
681                 DEBUG(DEBUG_ERR,(__location__
682                                  " ctdb_load_persistent_health(%s) failed\n",
683                                  ctdb_db->db_name));
684                 return -1;
685         }
686
687         *outdata = tdb_null;
688         if (ctdb_db->unhealthy_reason) {
689                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
690                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
691         }
692
693         return 0;
694 }
695
696
697 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
698 {
699         char *ropath;
700
701         if (ctdb_db->readonly) {
702                 return 0;
703         }
704
705         if (ctdb_db->persistent) {
706                 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
707                 return -1;
708         }
709
710         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
711         if (ropath == NULL) {
712                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
713                 return -1;
714         }
715         ctdb_db->rottdb = tdb_open(ropath, 
716                               ctdb->tunable.database_hash_size, 
717                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
718                               O_CREAT|O_RDWR, 0);
719         if (ctdb_db->rottdb == NULL) {
720                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
721                 talloc_free(ropath);
722                 return -1;
723         }
724
725         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
726
727         ctdb_db->readonly = true;
728         talloc_free(ropath);
729         return 0;
730 }
731
732 /*
733   attach to a database, handling both persistent and non-persistent databases
734   return 0 on success, -1 on failure
735  */
736 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
737                              bool persistent, const char *unhealthy_reason,
738                              bool jenkinshash)
739 {
740         struct ctdb_db_context *ctdb_db, *tmp_db;
741         int ret;
742         struct TDB_DATA key;
743         unsigned tdb_flags;
744         int mode = 0600;
745         int remaining_tries = 0;
746
747         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
748         CTDB_NO_MEMORY(ctdb, ctdb_db);
749
750         ctdb_db->priority = 1;
751         ctdb_db->ctdb = ctdb;
752         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
753         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
754
755         key.dsize = strlen(db_name)+1;
756         key.dptr  = discard_const(db_name);
757         ctdb_db->db_id = ctdb_hash(&key);
758         ctdb_db->persistent = persistent;
759
760         if (!ctdb_db->persistent) {
761                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
762                 if (ctdb_db->delete_queue == NULL) {
763                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
764                 }
765
766                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
767         }
768
769         /* check for hash collisions */
770         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
771                 if (tmp_db->db_id == ctdb_db->db_id) {
772                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
773                                  tmp_db->db_id, db_name, tmp_db->db_name));
774                         talloc_free(ctdb_db);
775                         return -1;
776                 }
777         }
778
779         if (persistent) {
780                 if (unhealthy_reason) {
781                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
782                                                             unhealthy_reason, 0);
783                         if (ret != 0) {
784                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
785                                                    ctdb_db->db_name, unhealthy_reason, ret));
786                                 talloc_free(ctdb_db);
787                                 return -1;
788                         }
789                 }
790
791                 if (ctdb->max_persistent_check_errors > 0) {
792                         remaining_tries = 1;
793                 }
794                 if (ctdb->done_startup) {
795                         remaining_tries = 0;
796                 }
797
798                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
799                 if (ret != 0) {
800                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
801                                    ctdb_db->db_name, ret));
802                         talloc_free(ctdb_db);
803                         return -1;
804                 }
805         }
806
807         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
808                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
809                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
810                 talloc_free(ctdb_db);
811                 return -1;
812         }
813
814         if (ctdb_db->unhealthy_reason) {
815                 /* this is just a warning, but we want that in the log file! */
816                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
817                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
818         }
819
820         /* open the database */
821         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
822                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
823                                            db_name, ctdb->pnn);
824
825         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
826         if (ctdb->valgrinding) {
827                 tdb_flags |= TDB_NOMMAP;
828         }
829         tdb_flags |= TDB_DISALLOW_NESTING;
830         if (jenkinshash) {
831                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
832         }
833
834 again:
835         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
836                                       ctdb->tunable.database_hash_size, 
837                                       tdb_flags, 
838                                       O_CREAT|O_RDWR, mode);
839         if (ctdb_db->ltdb == NULL) {
840                 struct stat st;
841                 int saved_errno = errno;
842
843                 if (!persistent) {
844                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
845                                           ctdb_db->db_path,
846                                           saved_errno,
847                                           strerror(saved_errno)));
848                         talloc_free(ctdb_db);
849                         return -1;
850                 }
851
852                 if (remaining_tries == 0) {
853                         DEBUG(DEBUG_CRIT,(__location__
854                                           "Failed to open persistent tdb '%s': %d - %s\n",
855                                           ctdb_db->db_path,
856                                           saved_errno,
857                                           strerror(saved_errno)));
858                         talloc_free(ctdb_db);
859                         return -1;
860                 }
861
862                 ret = stat(ctdb_db->db_path, &st);
863                 if (ret != 0) {
864                         DEBUG(DEBUG_CRIT,(__location__
865                                           "Failed to open persistent tdb '%s': %d - %s\n",
866                                           ctdb_db->db_path,
867                                           saved_errno,
868                                           strerror(saved_errno)));
869                         talloc_free(ctdb_db);
870                         return -1;
871                 }
872
873                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
874                 if (ret != 0) {
875                         DEBUG(DEBUG_CRIT,(__location__
876                                           "Failed to open persistent tdb '%s': %d - %s\n",
877                                           ctdb_db->db_path,
878                                           saved_errno,
879                                           strerror(saved_errno)));
880                         talloc_free(ctdb_db);
881                         return -1;
882                 }
883
884                 remaining_tries--;
885                 mode = st.st_mode;
886                 goto again;
887         }
888
889         if (!persistent) {
890                 ctdb_check_db_empty(ctdb_db);
891         } else {
892                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
893                 if (ret != 0) {
894                         int fd;
895                         struct stat st;
896
897                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
898                                           ctdb_db->db_path, ret,
899                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
900                         if (remaining_tries == 0) {
901                                 talloc_free(ctdb_db);
902                                 return -1;
903                         }
904
905                         fd = tdb_fd(ctdb_db->ltdb->tdb);
906                         ret = fstat(fd, &st);
907                         if (ret != 0) {
908                                 DEBUG(DEBUG_CRIT,(__location__
909                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
910                                                   ctdb_db->db_path,
911                                                   errno,
912                                                   strerror(errno)));
913                                 talloc_free(ctdb_db);
914                                 return -1;
915                         }
916
917                         /* close the TDB */
918                         talloc_free(ctdb_db->ltdb);
919                         ctdb_db->ltdb = NULL;
920
921                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
922                         if (ret != 0) {
923                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
924                                                   ctdb_db->db_path));
925                                 talloc_free(ctdb_db);
926                                 return -1;
927                         }
928
929                         remaining_tries--;
930                         mode = st.st_mode;
931                         goto again;
932                 }
933         }
934
935         /* set up a rb tree we can use to track which records we have a 
936            fetch-lock in-flight for so we can defer any additional calls
937            for the same record.
938          */
939         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
940         if (ctdb_db->deferred_fetch == NULL) {
941                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
942                 talloc_free(ctdb_db);
943                 return -1;
944         }
945
946         DLIST_ADD(ctdb->db_list, ctdb_db);
947
948         /* setting this can help some high churn databases */
949         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
950
951         /* 
952            all databases support the "null" function. we need this in
953            order to do forced migration of records
954         */
955         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
956         if (ret != 0) {
957                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
958                 talloc_free(ctdb_db);
959                 return -1;
960         }
961
962         /* 
963            all databases support the "fetch" function. we need this
964            for efficient Samba3 ctdb fetch
965         */
966         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
967         if (ret != 0) {
968                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
969                 talloc_free(ctdb_db);
970                 return -1;
971         }
972
973         /* 
974            all databases support the "fetch_with_header" function. we need this
975            for efficient readonly record fetches
976         */
977         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
978         if (ret != 0) {
979                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
980                 talloc_free(ctdb_db);
981                 return -1;
982         }
983
984         ret = ctdb_vacuum_init(ctdb_db);
985         if (ret != 0) {
986                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
987                                   "database '%s'\n", ctdb_db->db_name));
988                 talloc_free(ctdb_db);
989                 return -1;
990         }
991
992
993         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
994         
995         /* success */
996         return 0;
997 }
998
999
1000 struct ctdb_deferred_attach_context {
1001         struct ctdb_deferred_attach_context *next, *prev;
1002         struct ctdb_context *ctdb;
1003         struct ctdb_req_control *c;
1004 };
1005
1006
1007 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1008 {
1009         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1010
1011         return 0;
1012 }
1013
1014 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1015 {
1016         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1017         struct ctdb_context *ctdb = da_ctx->ctdb;
1018
1019         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1020         talloc_free(da_ctx);
1021 }
1022
1023 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1024 {
1025         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1026         struct ctdb_context *ctdb = da_ctx->ctdb;
1027
1028         /* This talloc-steals the packet ->c */
1029         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1030         talloc_free(da_ctx);
1031 }
1032
1033 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1034 {
1035         struct ctdb_deferred_attach_context *da_ctx;
1036
1037         /* call it from the main event loop as soon as the current event 
1038            finishes.
1039          */
1040         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1041                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1042                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1043         }
1044
1045         return 0;
1046 }
1047
1048 /*
1049   a client has asked to attach a new database
1050  */
1051 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1052                                TDB_DATA *outdata, uint64_t tdb_flags, 
1053                                bool persistent, uint32_t client_id,
1054                                struct ctdb_req_control *c,
1055                                bool *async_reply)
1056 {
1057         const char *db_name = (const char *)indata.dptr;
1058         struct ctdb_db_context *db;
1059         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1060         struct ctdb_client *client = NULL;
1061
1062         if (ctdb->tunable.allow_client_db_attach == 0) {
1063                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1064                                   "AllowClientDBAccess == 0\n", db_name));
1065                 return -1;
1066         }
1067
1068         /* dont allow any local clients to attach while we are in recovery mode
1069          * except for the recovery daemon.
1070          * allow all attach from the network since these are always from remote
1071          * recovery daemons.
1072          */
1073         if (client_id != 0) {
1074                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1075         }
1076         if (client != NULL) {
1077                 /* If the node is inactive it is not part of the cluster
1078                    and we should not allow clients to attach to any
1079                    databases
1080                 */
1081                 if (node->flags & NODE_FLAGS_INACTIVE) {
1082                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1083                         return -1;
1084                 }
1085
1086                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1087                  && client->pid != ctdb->recoverd_pid
1088                  && !ctdb->done_startup) {
1089                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1090
1091                         if (da_ctx == NULL) {
1092                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1093                                 return -1;
1094                         }
1095
1096                         da_ctx->ctdb = ctdb;
1097                         da_ctx->c = talloc_steal(da_ctx, c);
1098                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1099                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1100
1101                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1102
1103                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1104                         *async_reply = true;
1105                         return 0;
1106                 }
1107         }
1108
1109         /* the client can optionally pass additional tdb flags, but we
1110            only allow a subset of those on the database in ctdb. Note
1111            that tdb_flags is passed in via the (otherwise unused)
1112            srvid to the attach control */
1113         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1114
1115         /* see if we already have this name */
1116         db = ctdb_db_handle(ctdb, db_name);
1117         if (db) {
1118                 if (db->persistent != persistent) {
1119                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1120                                           "database %s", persistent ? "" : "non-",
1121                                           db-> persistent ? "" : "non-", db_name));
1122                         return -1;
1123                 }
1124                 outdata->dptr  = (uint8_t *)&db->db_id;
1125                 outdata->dsize = sizeof(db->db_id);
1126                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1127                 return 0;
1128         }
1129
1130         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1131                 return -1;
1132         }
1133
1134         db = ctdb_db_handle(ctdb, db_name);
1135         if (!db) {
1136                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1137                 return -1;
1138         }
1139
1140         /* remember the flags the client has specified */
1141         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1142
1143         outdata->dptr  = (uint8_t *)&db->db_id;
1144         outdata->dsize = sizeof(db->db_id);
1145
1146         /* Try to ensure it's locked in mem */
1147         ctdb_lockdown_memory(ctdb);
1148
1149         /* tell all the other nodes about this database */
1150         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1151                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1152                                                 CTDB_CONTROL_DB_ATTACH,
1153                                  0, CTDB_CTRL_FLAG_NOREPLY,
1154                                  indata, NULL, NULL);
1155
1156         /* success */
1157         return 0;
1158 }
1159
1160
1161 /*
1162   attach to all existing persistent databases
1163  */
1164 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1165                                   const char *unhealthy_reason)
1166 {
1167         DIR *d;
1168         struct dirent *de;
1169
1170         /* open the persistent db directory and scan it for files */
1171         d = opendir(ctdb->db_directory_persistent);
1172         if (d == NULL) {
1173                 return 0;
1174         }
1175
1176         while ((de=readdir(d))) {
1177                 char *p, *s, *q;
1178                 size_t len = strlen(de->d_name);
1179                 uint32_t node;
1180                 int invalid_name = 0;
1181                 
1182                 s = talloc_strdup(ctdb, de->d_name);
1183                 CTDB_NO_MEMORY(ctdb, s);
1184
1185                 /* only accept names ending in .tdb */
1186                 p = strstr(s, ".tdb.");
1187                 if (len < 7 || p == NULL) {
1188                         talloc_free(s);
1189                         continue;
1190                 }
1191
1192                 /* only accept names ending with .tdb. and any number of digits */
1193                 q = p+5;
1194                 while (*q != 0 && invalid_name == 0) {
1195                         if (!isdigit(*q++)) {
1196                                 invalid_name = 1;
1197                         }
1198                 }
1199                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1200                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1201                         talloc_free(s);
1202                         continue;
1203                 }
1204                 p[4] = 0;
1205
1206                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1207                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1208                         closedir(d);
1209                         talloc_free(s);
1210                         return -1;
1211                 }
1212
1213                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1214
1215                 talloc_free(s);
1216         }
1217         closedir(d);
1218         return 0;
1219 }
1220
1221 int ctdb_attach_databases(struct ctdb_context *ctdb)
1222 {
1223         int ret;
1224         char *persistent_health_path = NULL;
1225         char *unhealthy_reason = NULL;
1226         bool first_try = true;
1227
1228         if (ctdb->db_directory == NULL) {
1229                 ctdb->db_directory = VARDIR "/ctdb";
1230         }
1231         if (ctdb->db_directory_persistent == NULL) {
1232                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1233         }
1234         if (ctdb->db_directory_state == NULL) {
1235                 ctdb->db_directory_state = VARDIR "/ctdb/state";
1236         }
1237
1238         /* make sure the db directory exists */
1239         ret = mkdir(ctdb->db_directory, 0700);
1240         if (ret == -1 && errno != EEXIST) {
1241                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1242                          ctdb->db_directory));
1243                 return -1;
1244         }
1245
1246         /* make sure the persistent db directory exists */
1247         ret = mkdir(ctdb->db_directory_persistent, 0700);
1248         if (ret == -1 && errno != EEXIST) {
1249                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1250                          ctdb->db_directory_persistent));
1251                 return -1;
1252         }
1253
1254         /* make sure the internal state db directory exists */
1255         ret = mkdir(ctdb->db_directory_state, 0700);
1256         if (ret == -1 && errno != EEXIST) {
1257                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1258                          ctdb->db_directory_state));
1259                 return -1;
1260         }
1261
1262         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1263                                                  ctdb->db_directory_state,
1264                                                  PERSISTENT_HEALTH_TDB,
1265                                                  ctdb->pnn);
1266         if (persistent_health_path == NULL) {
1267                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1268                 return -1;
1269         }
1270
1271 again:
1272
1273         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1274                                                    0, TDB_DISALLOW_NESTING,
1275                                                    O_CREAT | O_RDWR, 0600);
1276         if (ctdb->db_persistent_health == NULL) {
1277                 struct tdb_wrap *tdb;
1278
1279                 if (!first_try) {
1280                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1281                                           persistent_health_path,
1282                                           errno,
1283                                           strerror(errno)));
1284                         talloc_free(persistent_health_path);
1285                         talloc_free(unhealthy_reason);
1286                         return -1;
1287                 }
1288                 first_try = false;
1289
1290                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1291                                                    persistent_health_path,
1292                                                    "was cleared after a failure",
1293                                                    "manual verification needed");
1294                 if (unhealthy_reason == NULL) {
1295                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1296                         talloc_free(persistent_health_path);
1297                         return -1;
1298                 }
1299
1300                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1301                                   persistent_health_path));
1302                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1303                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1304                                     O_CREAT | O_RDWR, 0600);
1305                 if (tdb) {
1306                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1307                                           persistent_health_path,
1308                                           errno,
1309                                           strerror(errno)));
1310                         talloc_free(persistent_health_path);
1311                         talloc_free(unhealthy_reason);
1312                         return -1;
1313                 }
1314
1315                 talloc_free(tdb);
1316                 goto again;
1317         }
1318         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1319         if (ret != 0) {
1320                 struct tdb_wrap *tdb;
1321
1322                 talloc_free(ctdb->db_persistent_health);
1323                 ctdb->db_persistent_health = NULL;
1324
1325                 if (!first_try) {
1326                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1327                                           persistent_health_path));
1328                         talloc_free(persistent_health_path);
1329                         talloc_free(unhealthy_reason);
1330                         return -1;
1331                 }
1332                 first_try = false;
1333
1334                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1335                                                    persistent_health_path,
1336                                                    "was cleared after a failure",
1337                                                    "manual verification needed");
1338                 if (unhealthy_reason == NULL) {
1339                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1340                         talloc_free(persistent_health_path);
1341                         return -1;
1342                 }
1343
1344                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1345                                   persistent_health_path));
1346                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1347                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1348                                     O_CREAT | O_RDWR, 0600);
1349                 if (tdb) {
1350                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1351                                           persistent_health_path,
1352                                           errno,
1353                                           strerror(errno)));
1354                         talloc_free(persistent_health_path);
1355                         talloc_free(unhealthy_reason);
1356                         return -1;
1357                 }
1358
1359                 talloc_free(tdb);
1360                 goto again;
1361         }
1362         talloc_free(persistent_health_path);
1363
1364         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1365         talloc_free(unhealthy_reason);
1366         if (ret != 0) {
1367                 return ret;
1368         }
1369
1370         return 0;
1371 }
1372
1373 /*
1374   called when a broadcast seqnum update comes in
1375  */
1376 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1377 {
1378         struct ctdb_db_context *ctdb_db;
1379         if (srcnode == ctdb->pnn) {
1380                 /* don't update ourselves! */
1381                 return 0;
1382         }
1383
1384         ctdb_db = find_ctdb_db(ctdb, db_id);
1385         if (!ctdb_db) {
1386                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1387                 return -1;
1388         }
1389
1390         if (ctdb_db->unhealthy_reason) {
1391                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1392                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1393                 return -1;
1394         }
1395
1396         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1397         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1398         return 0;
1399 }
1400
1401 /*
1402   timer to check for seqnum changes in a ltdb and propogate them
1403  */
1404 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1405                                    struct timeval t, void *p)
1406 {
1407         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1408         struct ctdb_context *ctdb = ctdb_db->ctdb;
1409         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1410         if (new_seqnum != ctdb_db->seqnum) {
1411                 /* something has changed - propogate it */
1412                 TDB_DATA data;
1413                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1414                 data.dsize = sizeof(uint32_t);
1415                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1416                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1417                                          data, NULL, NULL);             
1418         }
1419         ctdb_db->seqnum = new_seqnum;
1420
1421         /* setup a new timer */
1422         ctdb_db->seqnum_update =
1423                 event_add_timed(ctdb->ev, ctdb_db, 
1424                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1425                                 ctdb_ltdb_seqnum_check, ctdb_db);
1426 }
1427
1428 /*
1429   enable seqnum handling on this db
1430  */
1431 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1432 {
1433         struct ctdb_db_context *ctdb_db;
1434         ctdb_db = find_ctdb_db(ctdb, db_id);
1435         if (!ctdb_db) {
1436                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1437                 return -1;
1438         }
1439
1440         if (ctdb_db->seqnum_update == NULL) {
1441                 ctdb_db->seqnum_update =
1442                         event_add_timed(ctdb->ev, ctdb_db, 
1443                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1444                                         ctdb_ltdb_seqnum_check, ctdb_db);
1445         }
1446
1447         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1448         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1449         return 0;
1450 }
1451
1452 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1453 {
1454         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1455         struct ctdb_db_context *ctdb_db;
1456
1457         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1458         if (!ctdb_db) {
1459                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1460                 return 0;
1461         }
1462
1463         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1464                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1465                 return 0;
1466         }
1467
1468         ctdb_db->priority = db_prio->priority;
1469         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1470
1471         return 0;
1472 }
1473
1474
1475 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1476 {
1477
1478         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1479
1480         if (ctdb_db->sticky) {
1481                 return 0;
1482         }
1483
1484         if (ctdb_db->persistent) {
1485                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1486                 return -1;
1487         }
1488
1489         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1490
1491         ctdb_db->sticky = true;
1492
1493         return 0;
1494 }
1495
1496 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1497                                 uint32_t db_id,
1498                                 TDB_DATA *outdata)
1499 {
1500         struct ctdb_db_context *ctdb_db;
1501         struct ctdb_db_statistics_wire *stats;
1502         int i;
1503         int len;
1504         char *ptr;
1505
1506         ctdb_db = find_ctdb_db(ctdb, db_id);
1507         if (!ctdb_db) {
1508                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1509                 return -1;
1510         }
1511
1512         len = offsetof(struct ctdb_db_statistics_wire, hot_keys);
1513         for (i = 0; i < MAX_HOT_KEYS; i++) {
1514                 len += 8 + ctdb_db->statistics.hot_keys[i].key.dsize;
1515         }
1516
1517         stats = talloc_size(outdata, len);
1518         if (stats == NULL) {
1519                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics wire structure\n"));
1520                 return -1;
1521         }
1522
1523         stats->db_ro_delegations = ctdb_db->statistics.db_ro_delegations;
1524         stats->db_ro_revokes     = ctdb_db->statistics.db_ro_revokes;
1525         for (i = 0; i < MAX_COUNT_BUCKETS; i++) {
1526                 stats->hop_count_bucket[i] = ctdb_db->statistics.hop_count_bucket[i];
1527         }
1528         stats->num_hot_keys = MAX_HOT_KEYS;
1529
1530         ptr = &stats->hot_keys[0];
1531         for (i = 0; i < MAX_HOT_KEYS; i++) {
1532                 *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].count;
1533                 ptr += 4;
1534
1535                 *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].key.dsize;
1536                 ptr += 4;
1537
1538                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr, ctdb_db->statistics.hot_keys[i].key.dsize);
1539                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1540         }
1541
1542         outdata->dptr  = (uint8_t *)stats;
1543         outdata->dsize = len;
1544
1545         return 0;
1546 }