Merge remote branch 'amitay/tevent-sync'
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tdb/include/tdb.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
28 #include "db_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /**
35  * write a record to a normal database
36  *
37  * This is the server-variant of the ctdb_ltdb_store function.
38  * It contains logic to determine whether a record should be
39  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
40  * controls to the local ctdb daemon if apporpriate.
41  */
42 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
43                                   TDB_DATA key,
44                                   struct ctdb_ltdb_header *header,
45                                   TDB_DATA data)
46 {
47         struct ctdb_context *ctdb = ctdb_db->ctdb;
48         TDB_DATA rec;
49         int ret;
50         bool seqnum_suppressed = false;
51         bool keep = false;
52         bool schedule_for_deletion = false;
53         bool remove_from_delete_queue = false;
54         uint32_t lmaster;
55
56         if (ctdb->flags & CTDB_FLAG_TORTURE) {
57                 struct ctdb_ltdb_header *h2;
58                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
59                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
60                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
61                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
62                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
63                 }
64                 if (rec.dptr) free(rec.dptr);
65         }
66
67         if (ctdb->vnn_map == NULL) {
68                 /*
69                  * Called from a client: always store the record
70                  * Also don't call ctdb_lmaster since it uses the vnn_map!
71                  */
72                 keep = true;
73                 goto store;
74         }
75
76         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
77
78         /*
79          * If we migrate an empty record off to another node
80          * and the record has not been migrated with data,
81          * delete the record instead of storing the empty record.
82          */
83         if (data.dsize != 0) {
84                 keep = true;
85         } else if (header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)) {
86                 keep = true;
87         } else if (ctdb_db->persistent) {
88                 keep = true;
89         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
90                 /*
91                  * The record is not created by the client but
92                  * automatically by the ctdb_ltdb_fetch logic that
93                  * creates a record with an initial header in the
94                  * ltdb before trying to migrate the record from
95                  * the current lmaster. Keep it instead of trying
96                  * to delete the non-existing record...
97                  */
98                 keep = true;
99                 schedule_for_deletion = true;
100         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
101                 keep = true;
102         } else if (ctdb_db->ctdb->pnn == lmaster) {
103                 /*
104                  * If we are lmaster, then we usually keep the record.
105                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
106                  * and the record is empty and has never been migrated
107                  * with data, then we should delete it instead of storing it.
108                  * This is part of the vacuuming process.
109                  *
110                  * The reason that we usually need to store even empty records
111                  * on the lmaster is that a client operating directly on the
112                  * lmaster (== dmaster) expects the local copy of the record to
113                  * exist after successful ctdb migrate call. If the record does
114                  * not exist, the client goes into a migrate loop and eventually
115                  * fails. So storing the empty record makes sure that we do not
116                  * need to change the client code.
117                  */
118                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
119                         keep = true;
120                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
121                         keep = true;
122                 }
123         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
124                 keep = true;
125         }
126
127         if (keep) {
128                 if ((data.dsize == 0) &&
129                     !ctdb_db->persistent &&
130                     (ctdb_db->ctdb->pnn == header->dmaster) &&
131                     !(header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)))
132                 {
133                         schedule_for_deletion = true;
134                 }
135                 remove_from_delete_queue = !schedule_for_deletion;
136         }
137
138 store:
139         /*
140          * The VACUUM_MIGRATED flag is only set temporarily for
141          * the above logic when the record was retrieved by a
142          * VACUUM_MIGRATE call and should not be stored in the
143          * database.
144          *
145          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
146          * and there are two cases in which the corresponding record
147          * is stored in the local database:
148          * 1. The record has been migrated with data in the past
149          *    (the MIGRATED_WITH_DATA record flag is set).
150          * 2. The record has been filled with data again since it
151          *    had been submitted in the VACUUM_FETCH message to the
152          *    lmaster.
153          * For such records it is important to not store the
154          * VACUUM_MIGRATED flag in the database.
155          */
156         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
157
158         /*
159          * Similarly, clear the AUTOMATIC flag which should not enter
160          * the local database copy since this would require client
161          * modifications to clear the flag when the client stores
162          * the record.
163          */
164         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
165
166         rec.dsize = sizeof(*header) + data.dsize;
167         rec.dptr = talloc_size(ctdb, rec.dsize);
168         CTDB_NO_MEMORY(ctdb, rec.dptr);
169
170         memcpy(rec.dptr, header, sizeof(*header));
171         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
172
173         /* Databases with seqnum updates enabled only get their seqnum
174            changes when/if we modify the data */
175         if (ctdb_db->seqnum_update != NULL) {
176                 TDB_DATA old;
177                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
178
179                 if ( (old.dsize == rec.dsize)
180                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
181                           rec.dptr+sizeof(struct ctdb_ltdb_header),
182                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
183                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
184                         seqnum_suppressed = true;
185                 }
186                 if (old.dptr) free(old.dptr);
187         }
188
189         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
190                             ctdb_db->db_name,
191                             keep?"storing":"deleting",
192                             ctdb_hash(&key)));
193
194         if (keep) {
195                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
196         } else {
197                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
198         }
199
200         if (ret != 0) {
201                 int lvl = DEBUG_ERR;
202
203                 if (keep == false &&
204                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
205                 {
206                         lvl = DEBUG_DEBUG;
207                 }
208
209                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
210                             "%d - %s\n",
211                             ctdb_db->db_name,
212                             keep?"store":"delete", ret,
213                             tdb_errorstr(ctdb_db->ltdb->tdb)));
214
215                 schedule_for_deletion = false;
216                 remove_from_delete_queue = false;
217         }
218         if (seqnum_suppressed) {
219                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
220         }
221
222         talloc_free(rec.dptr);
223
224         if (schedule_for_deletion) {
225                 int ret2;
226                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
227                 if (ret2 != 0) {
228                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
229                 }
230         }
231
232         if (remove_from_delete_queue) {
233                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
234         }
235
236         return ret;
237 }
238
239 struct lock_fetch_state {
240         struct ctdb_context *ctdb;
241         void (*recv_pkt)(void *, struct ctdb_req_header *);
242         void *recv_context;
243         struct ctdb_req_header *hdr;
244         uint32_t generation;
245         bool ignore_generation;
246 };
247
248 /*
249   called when we should retry the operation
250  */
251 static void lock_fetch_callback(void *p)
252 {
253         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
254         if (!state->ignore_generation &&
255             state->generation != state->ctdb->vnn_map->generation) {
256                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
257                 talloc_free(state->hdr);
258                 return;
259         }
260         state->recv_pkt(state->recv_context, state->hdr);
261         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
262 }
263
264
265 /*
266   do a non-blocking ltdb_lock, deferring this ctdb request until we
267   have the chainlock
268
269   It does the following:
270
271    1) tries to get the chainlock. If it succeeds, then it returns 0
272
273    2) if it fails to get a chainlock immediately then it sets up a
274    non-blocking chainlock via ctdb_lockwait, and when it gets the
275    chainlock it re-submits this ctdb request to the main packet
276    receive function
277
278    This effectively queues all ctdb requests that cannot be
279    immediately satisfied until it can get the lock. This means that
280    the main ctdb daemon will not block waiting for a chainlock held by
281    a client
282
283    There are 3 possible return values:
284
285        0:    means that it got the lock immediately.
286       -1:    means that it failed to get the lock, and won't retry
287       -2:    means that it failed to get the lock immediately, but will retry
288  */
289 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
290                            TDB_DATA key, struct ctdb_req_header *hdr,
291                            void (*recv_pkt)(void *, struct ctdb_req_header *),
292                            void *recv_context, bool ignore_generation)
293 {
294         int ret;
295         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
296         struct lockwait_handle *h;
297         struct lock_fetch_state *state;
298         
299         ret = tdb_chainlock_nonblock(tdb, key);
300
301         if (ret != 0 &&
302             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
303                 /* a hard failure - don't try again */
304                 return -1;
305         }
306
307         /* when torturing, ensure we test the contended path */
308         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
309             random() % 5 == 0) {
310                 ret = -1;
311                 tdb_chainunlock(tdb, key);
312         }
313
314         /* first the non-contended path */
315         if (ret == 0) {
316                 return 0;
317         }
318
319         state = talloc(hdr, struct lock_fetch_state);
320         state->ctdb = ctdb_db->ctdb;
321         state->hdr = hdr;
322         state->recv_pkt = recv_pkt;
323         state->recv_context = recv_context;
324         state->generation = ctdb_db->ctdb->vnn_map->generation;
325         state->ignore_generation = ignore_generation;
326
327         /* now the contended path */
328         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
329         if (h == NULL) {
330                 return -1;
331         }
332
333         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
334            so it won't be freed yet */
335         talloc_steal(state, hdr);
336         talloc_steal(state, h);
337
338         /* now tell the caller than we will retry asynchronously */
339         return -2;
340 }
341
342 /*
343   a varient of ctdb_ltdb_lock_requeue that also fetches the record
344  */
345 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
346                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
347                                  struct ctdb_req_header *hdr, TDB_DATA *data,
348                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
349                                  void *recv_context, bool ignore_generation)
350 {
351         int ret;
352
353         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
354                                      recv_context, ignore_generation);
355         if (ret == 0) {
356                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
357                 if (ret != 0) {
358                         int uret;
359                         uret = ctdb_ltdb_unlock(ctdb_db, key);
360                         if (uret != 0) {
361                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
362                         }
363                 }
364         }
365         return ret;
366 }
367
368
369 /*
370   paraoid check to see if the db is empty
371  */
372 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
373 {
374         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
375         int count = tdb_traverse_read(tdb, NULL, NULL);
376         if (count != 0) {
377                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
378                          ctdb_db->db_path));
379                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
380         }
381 }
382
383 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
384                                 struct ctdb_db_context *ctdb_db)
385 {
386         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
387         char *old;
388         char *reason = NULL;
389         TDB_DATA key;
390         TDB_DATA val;
391
392         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
393         key.dsize = strlen(ctdb_db->db_name);
394
395         old = ctdb_db->unhealthy_reason;
396         ctdb_db->unhealthy_reason = NULL;
397
398         val = tdb_fetch(tdb, key);
399         if (val.dsize > 0) {
400                 reason = talloc_strndup(ctdb_db,
401                                         (const char *)val.dptr,
402                                         val.dsize);
403                 if (reason == NULL) {
404                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
405                                            (int)val.dsize));
406                         ctdb_db->unhealthy_reason = old;
407                         free(val.dptr);
408                         return -1;
409                 }
410         }
411
412         if (val.dptr) {
413                 free(val.dptr);
414         }
415
416         talloc_free(old);
417         ctdb_db->unhealthy_reason = reason;
418         return 0;
419 }
420
421 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
422                                   struct ctdb_db_context *ctdb_db,
423                                   const char *given_reason,/* NULL means healthy */
424                                   int num_healthy_nodes)
425 {
426         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
427         int ret;
428         TDB_DATA key;
429         TDB_DATA val;
430         char *new_reason = NULL;
431         char *old_reason = NULL;
432
433         ret = tdb_transaction_start(tdb);
434         if (ret != 0) {
435                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
436                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
437                 return -1;
438         }
439
440         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
441         if (ret != 0) {
442                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
443                                    ctdb_db->db_name, ret));
444                 return -1;
445         }
446         old_reason = ctdb_db->unhealthy_reason;
447
448         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
449         key.dsize = strlen(ctdb_db->db_name);
450
451         if (given_reason) {
452                 new_reason = talloc_strdup(ctdb_db, given_reason);
453                 if (new_reason == NULL) {
454                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
455                                           given_reason));
456                         return -1;
457                 }
458         } else if (old_reason && num_healthy_nodes == 0) {
459                 /*
460                  * If the reason indicates ok, but there where no healthy nodes
461                  * available, that it means, we have not recovered valid content
462                  * of the db. So if there's an old reason, prefix it with
463                  * "NO-HEALTHY-NODES - "
464                  */
465                 const char *prefix;
466
467 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
468                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
469                 if (ret != 0) {
470                         prefix = _TMP_PREFIX;
471                 } else {
472                         prefix = "";
473                 }
474                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
475                                          prefix, old_reason);
476                 if (new_reason == NULL) {
477                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
478                                           prefix, old_reason));
479                         return -1;
480                 }
481 #undef _TMP_PREFIX
482         }
483
484         if (new_reason) {
485                 val.dptr = discard_const_p(uint8_t, new_reason);
486                 val.dsize = strlen(new_reason);
487
488                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
489                 if (ret != 0) {
490                         tdb_transaction_cancel(tdb);
491                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
492                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
493                                            ret, tdb_errorstr(tdb)));
494                         talloc_free(new_reason);
495                         return -1;
496                 }
497                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
498                                    ctdb_db->db_name, new_reason));
499         } else if (old_reason) {
500                 ret = tdb_delete(tdb, key);
501                 if (ret != 0) {
502                         tdb_transaction_cancel(tdb);
503                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
504                                            tdb_name(tdb), ctdb_db->db_name,
505                                            ret, tdb_errorstr(tdb)));
506                         talloc_free(new_reason);
507                         return -1;
508                 }
509                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
510                                    ctdb_db->db_name));
511         }
512
513         ret = tdb_transaction_commit(tdb);
514         if (ret != TDB_SUCCESS) {
515                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
516                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
517                 talloc_free(new_reason);
518                 return -1;
519         }
520
521         talloc_free(old_reason);
522         ctdb_db->unhealthy_reason = new_reason;
523
524         return 0;
525 }
526
527 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
528                                      struct ctdb_db_context *ctdb_db)
529 {
530         time_t now = time(NULL);
531         char *new_path;
532         char *new_reason;
533         int ret;
534         struct tm *tm;
535
536         tm = gmtime(&now);
537
538         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
539         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
540                                    "%04u%02u%02u%02u%02u%02u.0Z",
541                                    ctdb_db->db_path,
542                                    tm->tm_year+1900, tm->tm_mon+1,
543                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
544                                    tm->tm_sec);
545         if (new_path == NULL) {
546                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
547                 return -1;
548         }
549
550         new_reason = talloc_asprintf(ctdb_db,
551                                      "ERROR - Backup of corrupted TDB in '%s'",
552                                      new_path);
553         if (new_reason == NULL) {
554                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
555                 return -1;
556         }
557         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
558         talloc_free(new_reason);
559         if (ret != 0) {
560                 DEBUG(DEBUG_CRIT,(__location__
561                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
562                                  ctdb_db->db_path));
563                 return -1;
564         }
565
566         ret = rename(ctdb_db->db_path, new_path);
567         if (ret != 0) {
568                 DEBUG(DEBUG_CRIT,(__location__
569                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
570                                   ctdb_db->db_path, new_path,
571                                   errno, strerror(errno)));
572                 talloc_free(new_path);
573                 return -1;
574         }
575
576         DEBUG(DEBUG_CRIT,(__location__
577                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
578                          ctdb_db->db_path, new_path));
579         talloc_free(new_path);
580         return 0;
581 }
582
583 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
584 {
585         struct ctdb_db_context *ctdb_db;
586         int ret;
587         int ok = 0;
588         int fail = 0;
589
590         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
591                 if (!ctdb_db->persistent) {
592                         continue;
593                 }
594
595                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
596                 if (ret != 0) {
597                         DEBUG(DEBUG_ALERT,(__location__
598                                            " load persistent health for '%s' failed\n",
599                                            ctdb_db->db_path));
600                         return -1;
601                 }
602
603                 if (ctdb_db->unhealthy_reason == NULL) {
604                         ok++;
605                         DEBUG(DEBUG_INFO,(__location__
606                                    " persistent db '%s' healthy\n",
607                                    ctdb_db->db_path));
608                         continue;
609                 }
610
611                 fail++;
612                 DEBUG(DEBUG_ALERT,(__location__
613                                    " persistent db '%s' unhealthy: %s\n",
614                                    ctdb_db->db_path,
615                                    ctdb_db->unhealthy_reason));
616         }
617         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
618               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
619                ok, fail));
620
621         if (fail != 0) {
622                 return -1;
623         }
624
625         return 0;
626 }
627
628
629 /*
630   mark a database - as healthy
631  */
632 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
633 {
634         uint32_t db_id = *(uint32_t *)indata.dptr;
635         struct ctdb_db_context *ctdb_db;
636         int ret;
637         bool may_recover = false;
638
639         ctdb_db = find_ctdb_db(ctdb, db_id);
640         if (!ctdb_db) {
641                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
642                 return -1;
643         }
644
645         if (ctdb_db->unhealthy_reason) {
646                 may_recover = true;
647         }
648
649         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
650         if (ret != 0) {
651                 DEBUG(DEBUG_ERR,(__location__
652                                  " ctdb_update_persistent_health(%s) failed\n",
653                                  ctdb_db->db_name));
654                 return -1;
655         }
656
657         if (may_recover && !ctdb->done_startup) {
658                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
659                                   ctdb_db->db_name));
660                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
661         }
662
663         return 0;
664 }
665
666 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
667                                    TDB_DATA indata,
668                                    TDB_DATA *outdata)
669 {
670         uint32_t db_id = *(uint32_t *)indata.dptr;
671         struct ctdb_db_context *ctdb_db;
672         int ret;
673
674         ctdb_db = find_ctdb_db(ctdb, db_id);
675         if (!ctdb_db) {
676                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
677                 return -1;
678         }
679
680         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
681         if (ret != 0) {
682                 DEBUG(DEBUG_ERR,(__location__
683                                  " ctdb_load_persistent_health(%s) failed\n",
684                                  ctdb_db->db_name));
685                 return -1;
686         }
687
688         *outdata = tdb_null;
689         if (ctdb_db->unhealthy_reason) {
690                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
691                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
692         }
693
694         return 0;
695 }
696
697
698 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
699 {
700         char *ropath;
701
702         if (ctdb_db->readonly) {
703                 return 0;
704         }
705
706         if (ctdb_db->persistent) {
707                 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
708                 return -1;
709         }
710
711         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
712         if (ropath == NULL) {
713                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
714                 return -1;
715         }
716         ctdb_db->rottdb = tdb_open(ropath, 
717                               ctdb->tunable.database_hash_size, 
718                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
719                               O_CREAT|O_RDWR, 0);
720         if (ctdb_db->rottdb == NULL) {
721                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
722                 talloc_free(ropath);
723                 return -1;
724         }
725
726         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
727
728         ctdb_db->readonly = true;
729         talloc_free(ropath);
730         return 0;
731 }
732
733 /*
734   attach to a database, handling both persistent and non-persistent databases
735   return 0 on success, -1 on failure
736  */
737 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
738                              bool persistent, const char *unhealthy_reason,
739                              bool jenkinshash)
740 {
741         struct ctdb_db_context *ctdb_db, *tmp_db;
742         int ret;
743         struct TDB_DATA key;
744         unsigned tdb_flags;
745         int mode = 0600;
746         int remaining_tries = 0;
747
748         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
749         CTDB_NO_MEMORY(ctdb, ctdb_db);
750
751         ctdb_db->priority = 1;
752         ctdb_db->ctdb = ctdb;
753         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
754         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
755
756         key.dsize = strlen(db_name)+1;
757         key.dptr  = discard_const(db_name);
758         ctdb_db->db_id = ctdb_hash(&key);
759         ctdb_db->persistent = persistent;
760
761         if (!ctdb_db->persistent) {
762                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
763                 if (ctdb_db->delete_queue == NULL) {
764                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
765                 }
766
767                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
768         }
769
770         /* check for hash collisions */
771         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
772                 if (tmp_db->db_id == ctdb_db->db_id) {
773                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
774                                  tmp_db->db_id, db_name, tmp_db->db_name));
775                         talloc_free(ctdb_db);
776                         return -1;
777                 }
778         }
779
780         if (persistent) {
781                 if (unhealthy_reason) {
782                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
783                                                             unhealthy_reason, 0);
784                         if (ret != 0) {
785                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
786                                                    ctdb_db->db_name, unhealthy_reason, ret));
787                                 talloc_free(ctdb_db);
788                                 return -1;
789                         }
790                 }
791
792                 if (ctdb->max_persistent_check_errors > 0) {
793                         remaining_tries = 1;
794                 }
795                 if (ctdb->done_startup) {
796                         remaining_tries = 0;
797                 }
798
799                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
800                 if (ret != 0) {
801                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
802                                    ctdb_db->db_name, ret));
803                         talloc_free(ctdb_db);
804                         return -1;
805                 }
806         }
807
808         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
809                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
810                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
811                 talloc_free(ctdb_db);
812                 return -1;
813         }
814
815         if (ctdb_db->unhealthy_reason) {
816                 /* this is just a warning, but we want that in the log file! */
817                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
818                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
819         }
820
821         /* open the database */
822         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
823                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
824                                            db_name, ctdb->pnn);
825
826         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
827         if (ctdb->valgrinding) {
828                 tdb_flags |= TDB_NOMMAP;
829         }
830         tdb_flags |= TDB_DISALLOW_NESTING;
831         if (jenkinshash) {
832                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
833         }
834
835 again:
836         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
837                                       ctdb->tunable.database_hash_size, 
838                                       tdb_flags, 
839                                       O_CREAT|O_RDWR, mode);
840         if (ctdb_db->ltdb == NULL) {
841                 struct stat st;
842                 int saved_errno = errno;
843
844                 if (!persistent) {
845                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
846                                           ctdb_db->db_path,
847                                           saved_errno,
848                                           strerror(saved_errno)));
849                         talloc_free(ctdb_db);
850                         return -1;
851                 }
852
853                 if (remaining_tries == 0) {
854                         DEBUG(DEBUG_CRIT,(__location__
855                                           "Failed to open persistent tdb '%s': %d - %s\n",
856                                           ctdb_db->db_path,
857                                           saved_errno,
858                                           strerror(saved_errno)));
859                         talloc_free(ctdb_db);
860                         return -1;
861                 }
862
863                 ret = stat(ctdb_db->db_path, &st);
864                 if (ret != 0) {
865                         DEBUG(DEBUG_CRIT,(__location__
866                                           "Failed to open persistent tdb '%s': %d - %s\n",
867                                           ctdb_db->db_path,
868                                           saved_errno,
869                                           strerror(saved_errno)));
870                         talloc_free(ctdb_db);
871                         return -1;
872                 }
873
874                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
875                 if (ret != 0) {
876                         DEBUG(DEBUG_CRIT,(__location__
877                                           "Failed to open persistent tdb '%s': %d - %s\n",
878                                           ctdb_db->db_path,
879                                           saved_errno,
880                                           strerror(saved_errno)));
881                         talloc_free(ctdb_db);
882                         return -1;
883                 }
884
885                 remaining_tries--;
886                 mode = st.st_mode;
887                 goto again;
888         }
889
890         if (!persistent) {
891                 ctdb_check_db_empty(ctdb_db);
892         } else {
893                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
894                 if (ret != 0) {
895                         int fd;
896                         struct stat st;
897
898                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
899                                           ctdb_db->db_path, ret,
900                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
901                         if (remaining_tries == 0) {
902                                 talloc_free(ctdb_db);
903                                 return -1;
904                         }
905
906                         fd = tdb_fd(ctdb_db->ltdb->tdb);
907                         ret = fstat(fd, &st);
908                         if (ret != 0) {
909                                 DEBUG(DEBUG_CRIT,(__location__
910                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
911                                                   ctdb_db->db_path,
912                                                   errno,
913                                                   strerror(errno)));
914                                 talloc_free(ctdb_db);
915                                 return -1;
916                         }
917
918                         /* close the TDB */
919                         talloc_free(ctdb_db->ltdb);
920                         ctdb_db->ltdb = NULL;
921
922                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
923                         if (ret != 0) {
924                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
925                                                   ctdb_db->db_path));
926                                 talloc_free(ctdb_db);
927                                 return -1;
928                         }
929
930                         remaining_tries--;
931                         mode = st.st_mode;
932                         goto again;
933                 }
934         }
935
936         /* set up a rb tree we can use to track which records we have a 
937            fetch-lock in-flight for so we can defer any additional calls
938            for the same record.
939          */
940         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
941         if (ctdb_db->deferred_fetch == NULL) {
942                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
943                 talloc_free(ctdb_db);
944                 return -1;
945         }
946
947         DLIST_ADD(ctdb->db_list, ctdb_db);
948
949         /* setting this can help some high churn databases */
950         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
951
952         /* 
953            all databases support the "null" function. we need this in
954            order to do forced migration of records
955         */
956         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
957         if (ret != 0) {
958                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
959                 talloc_free(ctdb_db);
960                 return -1;
961         }
962
963         /* 
964            all databases support the "fetch" function. we need this
965            for efficient Samba3 ctdb fetch
966         */
967         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
968         if (ret != 0) {
969                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
970                 talloc_free(ctdb_db);
971                 return -1;
972         }
973
974         /* 
975            all databases support the "fetch_with_header" function. we need this
976            for efficient readonly record fetches
977         */
978         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
979         if (ret != 0) {
980                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
981                 talloc_free(ctdb_db);
982                 return -1;
983         }
984
985         ret = ctdb_vacuum_init(ctdb_db);
986         if (ret != 0) {
987                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
988                                   "database '%s'\n", ctdb_db->db_name));
989                 talloc_free(ctdb_db);
990                 return -1;
991         }
992
993
994         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
995         
996         /* success */
997         return 0;
998 }
999
1000
1001 struct ctdb_deferred_attach_context {
1002         struct ctdb_deferred_attach_context *next, *prev;
1003         struct ctdb_context *ctdb;
1004         struct ctdb_req_control *c;
1005 };
1006
1007
1008 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1009 {
1010         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1011
1012         return 0;
1013 }
1014
1015 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1016 {
1017         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1018         struct ctdb_context *ctdb = da_ctx->ctdb;
1019
1020         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1021         talloc_free(da_ctx);
1022 }
1023
1024 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1025 {
1026         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1027         struct ctdb_context *ctdb = da_ctx->ctdb;
1028
1029         /* This talloc-steals the packet ->c */
1030         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1031         talloc_free(da_ctx);
1032 }
1033
1034 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1035 {
1036         struct ctdb_deferred_attach_context *da_ctx;
1037
1038         /* call it from the main event loop as soon as the current event 
1039            finishes.
1040          */
1041         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1042                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1043                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1044         }
1045
1046         return 0;
1047 }
1048
1049 /*
1050   a client has asked to attach a new database
1051  */
1052 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1053                                TDB_DATA *outdata, uint64_t tdb_flags, 
1054                                bool persistent, uint32_t client_id,
1055                                struct ctdb_req_control *c,
1056                                bool *async_reply)
1057 {
1058         const char *db_name = (const char *)indata.dptr;
1059         struct ctdb_db_context *db;
1060         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1061         struct ctdb_client *client = NULL;
1062
1063         if (ctdb->tunable.allow_client_db_attach == 0) {
1064                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1065                                   "AllowClientDBAccess == 0\n", db_name));
1066                 return -1;
1067         }
1068
1069         /* dont allow any local clients to attach while we are in recovery mode
1070          * except for the recovery daemon.
1071          * allow all attach from the network since these are always from remote
1072          * recovery daemons.
1073          */
1074         if (client_id != 0) {
1075                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1076         }
1077         if (client != NULL) {
1078                 /* If the node is inactive it is not part of the cluster
1079                    and we should not allow clients to attach to any
1080                    databases
1081                 */
1082                 if (node->flags & NODE_FLAGS_INACTIVE) {
1083                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1084                         return -1;
1085                 }
1086
1087                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1088                  && client->pid != ctdb->recoverd_pid
1089                  && !ctdb->done_startup) {
1090                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1091
1092                         if (da_ctx == NULL) {
1093                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1094                                 return -1;
1095                         }
1096
1097                         da_ctx->ctdb = ctdb;
1098                         da_ctx->c = talloc_steal(da_ctx, c);
1099                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1100                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1101
1102                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1103
1104                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1105                         *async_reply = true;
1106                         return 0;
1107                 }
1108         }
1109
1110         /* the client can optionally pass additional tdb flags, but we
1111            only allow a subset of those on the database in ctdb. Note
1112            that tdb_flags is passed in via the (otherwise unused)
1113            srvid to the attach control */
1114         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1115
1116         /* see if we already have this name */
1117         db = ctdb_db_handle(ctdb, db_name);
1118         if (db) {
1119                 outdata->dptr  = (uint8_t *)&db->db_id;
1120                 outdata->dsize = sizeof(db->db_id);
1121                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1122                 return 0;
1123         }
1124
1125         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1126                 return -1;
1127         }
1128
1129         db = ctdb_db_handle(ctdb, db_name);
1130         if (!db) {
1131                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1132                 return -1;
1133         }
1134
1135         /* remember the flags the client has specified */
1136         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1137
1138         outdata->dptr  = (uint8_t *)&db->db_id;
1139         outdata->dsize = sizeof(db->db_id);
1140
1141         /* Try to ensure it's locked in mem */
1142         ctdb_lockdown_memory(ctdb);
1143
1144         /* tell all the other nodes about this database */
1145         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1146                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1147                                                 CTDB_CONTROL_DB_ATTACH,
1148                                  0, CTDB_CTRL_FLAG_NOREPLY,
1149                                  indata, NULL, NULL);
1150
1151         /* success */
1152         return 0;
1153 }
1154
1155
1156 /*
1157   attach to all existing persistent databases
1158  */
1159 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1160                                   const char *unhealthy_reason)
1161 {
1162         DIR *d;
1163         struct dirent *de;
1164
1165         /* open the persistent db directory and scan it for files */
1166         d = opendir(ctdb->db_directory_persistent);
1167         if (d == NULL) {
1168                 return 0;
1169         }
1170
1171         while ((de=readdir(d))) {
1172                 char *p, *s, *q;
1173                 size_t len = strlen(de->d_name);
1174                 uint32_t node;
1175                 int invalid_name = 0;
1176                 
1177                 s = talloc_strdup(ctdb, de->d_name);
1178                 CTDB_NO_MEMORY(ctdb, s);
1179
1180                 /* only accept names ending in .tdb */
1181                 p = strstr(s, ".tdb.");
1182                 if (len < 7 || p == NULL) {
1183                         talloc_free(s);
1184                         continue;
1185                 }
1186
1187                 /* only accept names ending with .tdb. and any number of digits */
1188                 q = p+5;
1189                 while (*q != 0 && invalid_name == 0) {
1190                         if (!isdigit(*q++)) {
1191                                 invalid_name = 1;
1192                         }
1193                 }
1194                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1195                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1196                         talloc_free(s);
1197                         continue;
1198                 }
1199                 p[4] = 0;
1200
1201                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1202                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1203                         closedir(d);
1204                         talloc_free(s);
1205                         return -1;
1206                 }
1207
1208                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1209
1210                 talloc_free(s);
1211         }
1212         closedir(d);
1213         return 0;
1214 }
1215
1216 int ctdb_attach_databases(struct ctdb_context *ctdb)
1217 {
1218         int ret;
1219         char *persistent_health_path = NULL;
1220         char *unhealthy_reason = NULL;
1221         bool first_try = true;
1222
1223         if (ctdb->db_directory == NULL) {
1224                 ctdb->db_directory = VARDIR "/ctdb";
1225         }
1226         if (ctdb->db_directory_persistent == NULL) {
1227                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1228         }
1229         if (ctdb->db_directory_state == NULL) {
1230                 ctdb->db_directory_state = VARDIR "/ctdb/state";
1231         }
1232
1233         /* make sure the db directory exists */
1234         ret = mkdir(ctdb->db_directory, 0700);
1235         if (ret == -1 && errno != EEXIST) {
1236                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1237                          ctdb->db_directory));
1238                 return -1;
1239         }
1240
1241         /* make sure the persistent db directory exists */
1242         ret = mkdir(ctdb->db_directory_persistent, 0700);
1243         if (ret == -1 && errno != EEXIST) {
1244                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1245                          ctdb->db_directory_persistent));
1246                 return -1;
1247         }
1248
1249         /* make sure the internal state db directory exists */
1250         ret = mkdir(ctdb->db_directory_state, 0700);
1251         if (ret == -1 && errno != EEXIST) {
1252                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1253                          ctdb->db_directory_state));
1254                 return -1;
1255         }
1256
1257         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1258                                                  ctdb->db_directory_state,
1259                                                  PERSISTENT_HEALTH_TDB,
1260                                                  ctdb->pnn);
1261         if (persistent_health_path == NULL) {
1262                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1263                 return -1;
1264         }
1265
1266 again:
1267
1268         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1269                                                    0, TDB_DISALLOW_NESTING,
1270                                                    O_CREAT | O_RDWR, 0600);
1271         if (ctdb->db_persistent_health == NULL) {
1272                 struct tdb_wrap *tdb;
1273
1274                 if (!first_try) {
1275                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1276                                           persistent_health_path,
1277                                           errno,
1278                                           strerror(errno)));
1279                         talloc_free(persistent_health_path);
1280                         talloc_free(unhealthy_reason);
1281                         return -1;
1282                 }
1283                 first_try = false;
1284
1285                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1286                                                    persistent_health_path,
1287                                                    "was cleared after a failure",
1288                                                    "manual verification needed");
1289                 if (unhealthy_reason == NULL) {
1290                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1291                         talloc_free(persistent_health_path);
1292                         return -1;
1293                 }
1294
1295                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1296                                   persistent_health_path));
1297                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1298                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1299                                     O_CREAT | O_RDWR, 0600);
1300                 if (tdb) {
1301                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1302                                           persistent_health_path,
1303                                           errno,
1304                                           strerror(errno)));
1305                         talloc_free(persistent_health_path);
1306                         talloc_free(unhealthy_reason);
1307                         return -1;
1308                 }
1309
1310                 talloc_free(tdb);
1311                 goto again;
1312         }
1313         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1314         if (ret != 0) {
1315                 struct tdb_wrap *tdb;
1316
1317                 talloc_free(ctdb->db_persistent_health);
1318                 ctdb->db_persistent_health = NULL;
1319
1320                 if (!first_try) {
1321                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1322                                           persistent_health_path));
1323                         talloc_free(persistent_health_path);
1324                         talloc_free(unhealthy_reason);
1325                         return -1;
1326                 }
1327                 first_try = false;
1328
1329                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1330                                                    persistent_health_path,
1331                                                    "was cleared after a failure",
1332                                                    "manual verification needed");
1333                 if (unhealthy_reason == NULL) {
1334                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1335                         talloc_free(persistent_health_path);
1336                         return -1;
1337                 }
1338
1339                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1340                                   persistent_health_path));
1341                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1342                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1343                                     O_CREAT | O_RDWR, 0600);
1344                 if (tdb) {
1345                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1346                                           persistent_health_path,
1347                                           errno,
1348                                           strerror(errno)));
1349                         talloc_free(persistent_health_path);
1350                         talloc_free(unhealthy_reason);
1351                         return -1;
1352                 }
1353
1354                 talloc_free(tdb);
1355                 goto again;
1356         }
1357         talloc_free(persistent_health_path);
1358
1359         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1360         talloc_free(unhealthy_reason);
1361         if (ret != 0) {
1362                 return ret;
1363         }
1364
1365         return 0;
1366 }
1367
1368 /*
1369   called when a broadcast seqnum update comes in
1370  */
1371 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1372 {
1373         struct ctdb_db_context *ctdb_db;
1374         if (srcnode == ctdb->pnn) {
1375                 /* don't update ourselves! */
1376                 return 0;
1377         }
1378
1379         ctdb_db = find_ctdb_db(ctdb, db_id);
1380         if (!ctdb_db) {
1381                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1382                 return -1;
1383         }
1384
1385         if (ctdb_db->unhealthy_reason) {
1386                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1387                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1388                 return -1;
1389         }
1390
1391         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1392         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1393         return 0;
1394 }
1395
1396 /*
1397   timer to check for seqnum changes in a ltdb and propogate them
1398  */
1399 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1400                                    struct timeval t, void *p)
1401 {
1402         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1403         struct ctdb_context *ctdb = ctdb_db->ctdb;
1404         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1405         if (new_seqnum != ctdb_db->seqnum) {
1406                 /* something has changed - propogate it */
1407                 TDB_DATA data;
1408                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1409                 data.dsize = sizeof(uint32_t);
1410                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1411                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1412                                          data, NULL, NULL);             
1413         }
1414         ctdb_db->seqnum = new_seqnum;
1415
1416         /* setup a new timer */
1417         ctdb_db->seqnum_update =
1418                 event_add_timed(ctdb->ev, ctdb_db, 
1419                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1420                                 ctdb_ltdb_seqnum_check, ctdb_db);
1421 }
1422
1423 /*
1424   enable seqnum handling on this db
1425  */
1426 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1427 {
1428         struct ctdb_db_context *ctdb_db;
1429         ctdb_db = find_ctdb_db(ctdb, db_id);
1430         if (!ctdb_db) {
1431                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1432                 return -1;
1433         }
1434
1435         if (ctdb_db->seqnum_update == NULL) {
1436                 ctdb_db->seqnum_update =
1437                         event_add_timed(ctdb->ev, ctdb_db, 
1438                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1439                                         ctdb_ltdb_seqnum_check, ctdb_db);
1440         }
1441
1442         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1443         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1444         return 0;
1445 }
1446
1447 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1448 {
1449         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1450         struct ctdb_db_context *ctdb_db;
1451
1452         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1453         if (!ctdb_db) {
1454                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1455                 return 0;
1456         }
1457
1458         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1459                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1460                 return 0;
1461         }
1462
1463         ctdb_db->priority = db_prio->priority;
1464         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1465
1466         return 0;
1467 }
1468
1469
1470 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1471 {
1472
1473         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1474
1475         if (ctdb_db->sticky) {
1476                 return 0;
1477         }
1478
1479         if (ctdb_db->persistent) {
1480                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1481                 return -1;
1482         }
1483
1484         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1485
1486         ctdb_db->sticky = true;
1487
1488         return 0;
1489 }
1490
1491 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1492                                 uint32_t db_id,
1493                                 TDB_DATA *outdata)
1494 {
1495         struct ctdb_db_context *ctdb_db;
1496
1497         ctdb_db = find_ctdb_db(ctdb, db_id);
1498         if (!ctdb_db) {
1499                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1500                 return -1;
1501         }
1502
1503         outdata->dptr  = (uint8_t *)&(ctdb_db->statistics);
1504         outdata->dsize = sizeof(ctdb_db->statistics);
1505
1506         return 0;
1507 }