ctdb-vacuum: revert "Do not delete VACUUM MIGRATED records immediately"
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /**
35  * write a record to a normal database
36  *
37  * This is the server-variant of the ctdb_ltdb_store function.
38  * It contains logic to determine whether a record should be
39  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
40  * controls to the local ctdb daemon if apporpriate.
41  */
42 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
43                                   TDB_DATA key,
44                                   struct ctdb_ltdb_header *header,
45                                   TDB_DATA data)
46 {
47         struct ctdb_context *ctdb = ctdb_db->ctdb;
48         TDB_DATA rec;
49         int ret;
50         bool seqnum_suppressed = false;
51         bool keep = false;
52         bool schedule_for_deletion = false;
53         bool remove_from_delete_queue = false;
54         uint32_t lmaster;
55
56         if (ctdb->flags & CTDB_FLAG_TORTURE) {
57                 struct ctdb_ltdb_header *h2;
58                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
59                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
60                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
61                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
62                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
63                 }
64                 if (rec.dptr) free(rec.dptr);
65         }
66
67         if (ctdb->vnn_map == NULL) {
68                 /*
69                  * Called from a client: always store the record
70                  * Also don't call ctdb_lmaster since it uses the vnn_map!
71                  */
72                 keep = true;
73                 goto store;
74         }
75
76         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
77
78         /*
79          * If we migrate an empty record off to another node
80          * and the record has not been migrated with data,
81          * delete the record instead of storing the empty record.
82          */
83         if (data.dsize != 0) {
84                 keep = true;
85         } else if (header->flags & CTDB_REC_RO_FLAGS) {
86                 keep = true;
87         } else if (ctdb_db->persistent) {
88                 keep = true;
89         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
90                 /*
91                  * The record is not created by the client but
92                  * automatically by the ctdb_ltdb_fetch logic that
93                  * creates a record with an initial header in the
94                  * ltdb before trying to migrate the record from
95                  * the current lmaster. Keep it instead of trying
96                  * to delete the non-existing record...
97                  */
98                 keep = true;
99                 schedule_for_deletion = true;
100         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
101                 keep = true;
102         } else if (ctdb_db->ctdb->pnn == lmaster) {
103                 /*
104                  * If we are lmaster, then we usually keep the record.
105                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
106                  * and the record is empty and has never been migrated
107                  * with data, then we should delete it instead of storing it.
108                  * This is part of the vacuuming process.
109                  *
110                  * The reason that we usually need to store even empty records
111                  * on the lmaster is that a client operating directly on the
112                  * lmaster (== dmaster) expects the local copy of the record to
113                  * exist after successful ctdb migrate call. If the record does
114                  * not exist, the client goes into a migrate loop and eventually
115                  * fails. So storing the empty record makes sure that we do not
116                  * need to change the client code.
117                  */
118                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
119                         keep = true;
120                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
121                         keep = true;
122                 }
123         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
124                 keep = true;
125         }
126
127         if (keep) {
128                 if (!ctdb_db->persistent &&
129                     (ctdb_db->ctdb->pnn == header->dmaster) &&
130                     !(header->flags & CTDB_REC_RO_FLAGS))
131                 {
132                         header->rsn++;
133
134                         if (data.dsize == 0) {
135                                 schedule_for_deletion = true;
136                         }
137                 }
138                 remove_from_delete_queue = !schedule_for_deletion;
139         }
140
141 store:
142         /*
143          * The VACUUM_MIGRATED flag is only set temporarily for
144          * the above logic when the record was retrieved by a
145          * VACUUM_MIGRATE call and should not be stored in the
146          * database.
147          *
148          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
149          * and there are two cases in which the corresponding record
150          * is stored in the local database:
151          * 1. The record has been migrated with data in the past
152          *    (the MIGRATED_WITH_DATA record flag is set).
153          * 2. The record has been filled with data again since it
154          *    had been submitted in the VACUUM_FETCH message to the
155          *    lmaster.
156          * For such records it is important to not store the
157          * VACUUM_MIGRATED flag in the database.
158          */
159         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
160
161         /*
162          * Similarly, clear the AUTOMATIC flag which should not enter
163          * the local database copy since this would require client
164          * modifications to clear the flag when the client stores
165          * the record.
166          */
167         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
168
169         rec.dsize = sizeof(*header) + data.dsize;
170         rec.dptr = talloc_size(ctdb, rec.dsize);
171         CTDB_NO_MEMORY(ctdb, rec.dptr);
172
173         memcpy(rec.dptr, header, sizeof(*header));
174         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
175
176         /* Databases with seqnum updates enabled only get their seqnum
177            changes when/if we modify the data */
178         if (ctdb_db->seqnum_update != NULL) {
179                 TDB_DATA old;
180                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
181
182                 if ( (old.dsize == rec.dsize)
183                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
184                           rec.dptr+sizeof(struct ctdb_ltdb_header),
185                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
186                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
187                         seqnum_suppressed = true;
188                 }
189                 if (old.dptr) free(old.dptr);
190         }
191
192         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
193                             ctdb_db->db_name,
194                             keep?"storing":"deleting",
195                             ctdb_hash(&key)));
196
197         if (keep) {
198                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
199         } else {
200                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
201         }
202
203         if (ret != 0) {
204                 int lvl = DEBUG_ERR;
205
206                 if (keep == false &&
207                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
208                 {
209                         lvl = DEBUG_DEBUG;
210                 }
211
212                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
213                             "%d - %s\n",
214                             ctdb_db->db_name,
215                             keep?"store":"delete", ret,
216                             tdb_errorstr(ctdb_db->ltdb->tdb)));
217
218                 schedule_for_deletion = false;
219                 remove_from_delete_queue = false;
220         }
221         if (seqnum_suppressed) {
222                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
223         }
224
225         talloc_free(rec.dptr);
226
227         if (schedule_for_deletion) {
228                 int ret2;
229                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
230                 if (ret2 != 0) {
231                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
232                 }
233         }
234
235         if (remove_from_delete_queue) {
236                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
237         }
238
239         return ret;
240 }
241
242 struct lock_fetch_state {
243         struct ctdb_context *ctdb;
244         void (*recv_pkt)(void *, struct ctdb_req_header *);
245         void *recv_context;
246         struct ctdb_req_header *hdr;
247         uint32_t generation;
248         bool ignore_generation;
249 };
250
251 /*
252   called when we should retry the operation
253  */
254 static void lock_fetch_callback(void *p, bool locked)
255 {
256         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
257         if (!state->ignore_generation &&
258             state->generation != state->ctdb->vnn_map->generation) {
259                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
260                 talloc_free(state->hdr);
261                 return;
262         }
263         state->recv_pkt(state->recv_context, state->hdr);
264         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
265 }
266
267
268 /*
269   do a non-blocking ltdb_lock, deferring this ctdb request until we
270   have the chainlock
271
272   It does the following:
273
274    1) tries to get the chainlock. If it succeeds, then it returns 0
275
276    2) if it fails to get a chainlock immediately then it sets up a
277    non-blocking chainlock via ctdb_lock_record, and when it gets the
278    chainlock it re-submits this ctdb request to the main packet
279    receive function.
280
281    This effectively queues all ctdb requests that cannot be
282    immediately satisfied until it can get the lock. This means that
283    the main ctdb daemon will not block waiting for a chainlock held by
284    a client
285
286    There are 3 possible return values:
287
288        0:    means that it got the lock immediately.
289       -1:    means that it failed to get the lock, and won't retry
290       -2:    means that it failed to get the lock immediately, but will retry
291  */
292 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
293                            TDB_DATA key, struct ctdb_req_header *hdr,
294                            void (*recv_pkt)(void *, struct ctdb_req_header *),
295                            void *recv_context, bool ignore_generation)
296 {
297         int ret;
298         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
299         struct lock_request *lreq;
300         struct lock_fetch_state *state;
301         
302         ret = tdb_chainlock_nonblock(tdb, key);
303
304         if (ret != 0 &&
305             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
306                 /* a hard failure - don't try again */
307                 return -1;
308         }
309
310         /* when torturing, ensure we test the contended path */
311         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
312             random() % 5 == 0) {
313                 ret = -1;
314                 tdb_chainunlock(tdb, key);
315         }
316
317         /* first the non-contended path */
318         if (ret == 0) {
319                 return 0;
320         }
321
322         state = talloc(hdr, struct lock_fetch_state);
323         state->ctdb = ctdb_db->ctdb;
324         state->hdr = hdr;
325         state->recv_pkt = recv_pkt;
326         state->recv_context = recv_context;
327         state->generation = ctdb_db->ctdb->vnn_map->generation;
328         state->ignore_generation = ignore_generation;
329
330         /* now the contended path */
331         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
332         if (lreq == NULL) {
333                 return -1;
334         }
335
336         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
337            so it won't be freed yet */
338         talloc_steal(state, hdr);
339
340         /* now tell the caller than we will retry asynchronously */
341         return -2;
342 }
343
344 /*
345   a varient of ctdb_ltdb_lock_requeue that also fetches the record
346  */
347 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
348                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
349                                  struct ctdb_req_header *hdr, TDB_DATA *data,
350                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
351                                  void *recv_context, bool ignore_generation)
352 {
353         int ret;
354
355         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
356                                      recv_context, ignore_generation);
357         if (ret == 0) {
358                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
359                 if (ret != 0) {
360                         int uret;
361                         uret = ctdb_ltdb_unlock(ctdb_db, key);
362                         if (uret != 0) {
363                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
364                         }
365                 }
366         }
367         return ret;
368 }
369
370
371 /*
372   paraoid check to see if the db is empty
373  */
374 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
375 {
376         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
377         int count = tdb_traverse_read(tdb, NULL, NULL);
378         if (count != 0) {
379                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
380                          ctdb_db->db_path));
381                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
382         }
383 }
384
385 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
386                                 struct ctdb_db_context *ctdb_db)
387 {
388         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
389         char *old;
390         char *reason = NULL;
391         TDB_DATA key;
392         TDB_DATA val;
393
394         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
395         key.dsize = strlen(ctdb_db->db_name);
396
397         old = ctdb_db->unhealthy_reason;
398         ctdb_db->unhealthy_reason = NULL;
399
400         val = tdb_fetch(tdb, key);
401         if (val.dsize > 0) {
402                 reason = talloc_strndup(ctdb_db,
403                                         (const char *)val.dptr,
404                                         val.dsize);
405                 if (reason == NULL) {
406                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
407                                            (int)val.dsize));
408                         ctdb_db->unhealthy_reason = old;
409                         free(val.dptr);
410                         return -1;
411                 }
412         }
413
414         if (val.dptr) {
415                 free(val.dptr);
416         }
417
418         talloc_free(old);
419         ctdb_db->unhealthy_reason = reason;
420         return 0;
421 }
422
423 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
424                                   struct ctdb_db_context *ctdb_db,
425                                   const char *given_reason,/* NULL means healthy */
426                                   int num_healthy_nodes)
427 {
428         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
429         int ret;
430         TDB_DATA key;
431         TDB_DATA val;
432         char *new_reason = NULL;
433         char *old_reason = NULL;
434
435         ret = tdb_transaction_start(tdb);
436         if (ret != 0) {
437                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
438                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
439                 return -1;
440         }
441
442         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
443         if (ret != 0) {
444                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
445                                    ctdb_db->db_name, ret));
446                 return -1;
447         }
448         old_reason = ctdb_db->unhealthy_reason;
449
450         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
451         key.dsize = strlen(ctdb_db->db_name);
452
453         if (given_reason) {
454                 new_reason = talloc_strdup(ctdb_db, given_reason);
455                 if (new_reason == NULL) {
456                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
457                                           given_reason));
458                         return -1;
459                 }
460         } else if (old_reason && num_healthy_nodes == 0) {
461                 /*
462                  * If the reason indicates ok, but there where no healthy nodes
463                  * available, that it means, we have not recovered valid content
464                  * of the db. So if there's an old reason, prefix it with
465                  * "NO-HEALTHY-NODES - "
466                  */
467                 const char *prefix;
468
469 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
470                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
471                 if (ret != 0) {
472                         prefix = _TMP_PREFIX;
473                 } else {
474                         prefix = "";
475                 }
476                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
477                                          prefix, old_reason);
478                 if (new_reason == NULL) {
479                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
480                                           prefix, old_reason));
481                         return -1;
482                 }
483 #undef _TMP_PREFIX
484         }
485
486         if (new_reason) {
487                 val.dptr = discard_const_p(uint8_t, new_reason);
488                 val.dsize = strlen(new_reason);
489
490                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
491                 if (ret != 0) {
492                         tdb_transaction_cancel(tdb);
493                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
494                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
495                                            ret, tdb_errorstr(tdb)));
496                         talloc_free(new_reason);
497                         return -1;
498                 }
499                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
500                                    ctdb_db->db_name, new_reason));
501         } else if (old_reason) {
502                 ret = tdb_delete(tdb, key);
503                 if (ret != 0) {
504                         tdb_transaction_cancel(tdb);
505                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
506                                            tdb_name(tdb), ctdb_db->db_name,
507                                            ret, tdb_errorstr(tdb)));
508                         talloc_free(new_reason);
509                         return -1;
510                 }
511                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
512                                    ctdb_db->db_name));
513         }
514
515         ret = tdb_transaction_commit(tdb);
516         if (ret != TDB_SUCCESS) {
517                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
518                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
519                 talloc_free(new_reason);
520                 return -1;
521         }
522
523         talloc_free(old_reason);
524         ctdb_db->unhealthy_reason = new_reason;
525
526         return 0;
527 }
528
529 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
530                                      struct ctdb_db_context *ctdb_db)
531 {
532         time_t now = time(NULL);
533         char *new_path;
534         char *new_reason;
535         int ret;
536         struct tm *tm;
537
538         tm = gmtime(&now);
539
540         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
541         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
542                                    "%04u%02u%02u%02u%02u%02u.0Z",
543                                    ctdb_db->db_path,
544                                    tm->tm_year+1900, tm->tm_mon+1,
545                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
546                                    tm->tm_sec);
547         if (new_path == NULL) {
548                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
549                 return -1;
550         }
551
552         new_reason = talloc_asprintf(ctdb_db,
553                                      "ERROR - Backup of corrupted TDB in '%s'",
554                                      new_path);
555         if (new_reason == NULL) {
556                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
557                 return -1;
558         }
559         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
560         talloc_free(new_reason);
561         if (ret != 0) {
562                 DEBUG(DEBUG_CRIT,(__location__
563                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
564                                  ctdb_db->db_path));
565                 return -1;
566         }
567
568         ret = rename(ctdb_db->db_path, new_path);
569         if (ret != 0) {
570                 DEBUG(DEBUG_CRIT,(__location__
571                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
572                                   ctdb_db->db_path, new_path,
573                                   errno, strerror(errno)));
574                 talloc_free(new_path);
575                 return -1;
576         }
577
578         DEBUG(DEBUG_CRIT,(__location__
579                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
580                          ctdb_db->db_path, new_path));
581         talloc_free(new_path);
582         return 0;
583 }
584
585 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
586 {
587         struct ctdb_db_context *ctdb_db;
588         int ret;
589         int ok = 0;
590         int fail = 0;
591
592         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
593                 if (!ctdb_db->persistent) {
594                         continue;
595                 }
596
597                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
598                 if (ret != 0) {
599                         DEBUG(DEBUG_ALERT,(__location__
600                                            " load persistent health for '%s' failed\n",
601                                            ctdb_db->db_path));
602                         return -1;
603                 }
604
605                 if (ctdb_db->unhealthy_reason == NULL) {
606                         ok++;
607                         DEBUG(DEBUG_INFO,(__location__
608                                    " persistent db '%s' healthy\n",
609                                    ctdb_db->db_path));
610                         continue;
611                 }
612
613                 fail++;
614                 DEBUG(DEBUG_ALERT,(__location__
615                                    " persistent db '%s' unhealthy: %s\n",
616                                    ctdb_db->db_path,
617                                    ctdb_db->unhealthy_reason));
618         }
619         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
620               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
621                ok, fail));
622
623         if (fail != 0) {
624                 return -1;
625         }
626
627         return 0;
628 }
629
630
631 /*
632   mark a database - as healthy
633  */
634 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
635 {
636         uint32_t db_id = *(uint32_t *)indata.dptr;
637         struct ctdb_db_context *ctdb_db;
638         int ret;
639         bool may_recover = false;
640
641         ctdb_db = find_ctdb_db(ctdb, db_id);
642         if (!ctdb_db) {
643                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
644                 return -1;
645         }
646
647         if (ctdb_db->unhealthy_reason) {
648                 may_recover = true;
649         }
650
651         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
652         if (ret != 0) {
653                 DEBUG(DEBUG_ERR,(__location__
654                                  " ctdb_update_persistent_health(%s) failed\n",
655                                  ctdb_db->db_name));
656                 return -1;
657         }
658
659         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
660                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
661                                   ctdb_db->db_name));
662                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
663         }
664
665         return 0;
666 }
667
668 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
669                                    TDB_DATA indata,
670                                    TDB_DATA *outdata)
671 {
672         uint32_t db_id = *(uint32_t *)indata.dptr;
673         struct ctdb_db_context *ctdb_db;
674         int ret;
675
676         ctdb_db = find_ctdb_db(ctdb, db_id);
677         if (!ctdb_db) {
678                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
679                 return -1;
680         }
681
682         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
683         if (ret != 0) {
684                 DEBUG(DEBUG_ERR,(__location__
685                                  " ctdb_load_persistent_health(%s) failed\n",
686                                  ctdb_db->db_name));
687                 return -1;
688         }
689
690         *outdata = tdb_null;
691         if (ctdb_db->unhealthy_reason) {
692                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
693                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
694         }
695
696         return 0;
697 }
698
699
700 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
701 {
702         char *ropath;
703
704         if (ctdb_db->readonly) {
705                 return 0;
706         }
707
708         if (ctdb_db->persistent) {
709                 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
710                 return -1;
711         }
712
713         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
714         if (ropath == NULL) {
715                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
716                 return -1;
717         }
718         ctdb_db->rottdb = tdb_open(ropath, 
719                               ctdb->tunable.database_hash_size, 
720                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
721                               O_CREAT|O_RDWR, 0);
722         if (ctdb_db->rottdb == NULL) {
723                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
724                 talloc_free(ropath);
725                 return -1;
726         }
727
728         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
729
730         ctdb_db->readonly = true;
731
732         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
733
734         talloc_free(ropath);
735         return 0;
736 }
737
738 /*
739   attach to a database, handling both persistent and non-persistent databases
740   return 0 on success, -1 on failure
741  */
742 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
743                              bool persistent, const char *unhealthy_reason,
744                              bool jenkinshash, bool mutexes)
745 {
746         struct ctdb_db_context *ctdb_db, *tmp_db;
747         int ret;
748         struct TDB_DATA key;
749         unsigned tdb_flags;
750         int mode = 0600;
751         int remaining_tries = 0;
752
753         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
754         CTDB_NO_MEMORY(ctdb, ctdb_db);
755
756         ctdb_db->priority = 1;
757         ctdb_db->ctdb = ctdb;
758         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
759         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
760
761         key.dsize = strlen(db_name)+1;
762         key.dptr  = discard_const(db_name);
763         ctdb_db->db_id = ctdb_hash(&key);
764         ctdb_db->persistent = persistent;
765
766         if (!ctdb_db->persistent) {
767                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
768                 if (ctdb_db->delete_queue == NULL) {
769                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
770                 }
771
772                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
773         }
774
775         /* check for hash collisions */
776         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
777                 if (tmp_db->db_id == ctdb_db->db_id) {
778                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
779                                  tmp_db->db_id, db_name, tmp_db->db_name));
780                         talloc_free(ctdb_db);
781                         return -1;
782                 }
783         }
784
785         if (persistent) {
786                 if (unhealthy_reason) {
787                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
788                                                             unhealthy_reason, 0);
789                         if (ret != 0) {
790                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
791                                                    ctdb_db->db_name, unhealthy_reason, ret));
792                                 talloc_free(ctdb_db);
793                                 return -1;
794                         }
795                 }
796
797                 if (ctdb->max_persistent_check_errors > 0) {
798                         remaining_tries = 1;
799                 }
800                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
801                         remaining_tries = 0;
802                 }
803
804                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
805                 if (ret != 0) {
806                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
807                                    ctdb_db->db_name, ret));
808                         talloc_free(ctdb_db);
809                         return -1;
810                 }
811         }
812
813         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
814                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
815                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
816                 talloc_free(ctdb_db);
817                 return -1;
818         }
819
820         if (ctdb_db->unhealthy_reason) {
821                 /* this is just a warning, but we want that in the log file! */
822                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
823                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
824         }
825
826         /* open the database */
827         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
828                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
829                                            db_name, ctdb->pnn);
830
831         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
832         if (ctdb->valgrinding) {
833                 tdb_flags |= TDB_NOMMAP;
834         }
835         tdb_flags |= TDB_DISALLOW_NESTING;
836         if (jenkinshash) {
837                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
838         }
839 #ifdef TDB_MUTEX_LOCKING
840         if (ctdb->tunable.mutex_enabled && mutexes &&
841             tdb_runtime_check_for_robust_mutexes()) {
842                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
843         }
844 #endif
845
846 again:
847         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
848                                       ctdb->tunable.database_hash_size, 
849                                       tdb_flags, 
850                                       O_CREAT|O_RDWR, mode);
851         if (ctdb_db->ltdb == NULL) {
852                 struct stat st;
853                 int saved_errno = errno;
854
855                 if (!persistent) {
856                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
857                                           ctdb_db->db_path,
858                                           saved_errno,
859                                           strerror(saved_errno)));
860                         talloc_free(ctdb_db);
861                         return -1;
862                 }
863
864                 if (remaining_tries == 0) {
865                         DEBUG(DEBUG_CRIT,(__location__
866                                           "Failed to open persistent tdb '%s': %d - %s\n",
867                                           ctdb_db->db_path,
868                                           saved_errno,
869                                           strerror(saved_errno)));
870                         talloc_free(ctdb_db);
871                         return -1;
872                 }
873
874                 ret = stat(ctdb_db->db_path, &st);
875                 if (ret != 0) {
876                         DEBUG(DEBUG_CRIT,(__location__
877                                           "Failed to open persistent tdb '%s': %d - %s\n",
878                                           ctdb_db->db_path,
879                                           saved_errno,
880                                           strerror(saved_errno)));
881                         talloc_free(ctdb_db);
882                         return -1;
883                 }
884
885                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
886                 if (ret != 0) {
887                         DEBUG(DEBUG_CRIT,(__location__
888                                           "Failed to open persistent tdb '%s': %d - %s\n",
889                                           ctdb_db->db_path,
890                                           saved_errno,
891                                           strerror(saved_errno)));
892                         talloc_free(ctdb_db);
893                         return -1;
894                 }
895
896                 remaining_tries--;
897                 mode = st.st_mode;
898                 goto again;
899         }
900
901         if (!persistent) {
902                 ctdb_check_db_empty(ctdb_db);
903         } else {
904                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
905                 if (ret != 0) {
906                         int fd;
907                         struct stat st;
908
909                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
910                                           ctdb_db->db_path, ret,
911                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
912                         if (remaining_tries == 0) {
913                                 talloc_free(ctdb_db);
914                                 return -1;
915                         }
916
917                         fd = tdb_fd(ctdb_db->ltdb->tdb);
918                         ret = fstat(fd, &st);
919                         if (ret != 0) {
920                                 DEBUG(DEBUG_CRIT,(__location__
921                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
922                                                   ctdb_db->db_path,
923                                                   errno,
924                                                   strerror(errno)));
925                                 talloc_free(ctdb_db);
926                                 return -1;
927                         }
928
929                         /* close the TDB */
930                         talloc_free(ctdb_db->ltdb);
931                         ctdb_db->ltdb = NULL;
932
933                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
934                         if (ret != 0) {
935                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
936                                                   ctdb_db->db_path));
937                                 talloc_free(ctdb_db);
938                                 return -1;
939                         }
940
941                         remaining_tries--;
942                         mode = st.st_mode;
943                         goto again;
944                 }
945         }
946
947         /* set up a rb tree we can use to track which records we have a 
948            fetch-lock in-flight for so we can defer any additional calls
949            for the same record.
950          */
951         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
952         if (ctdb_db->deferred_fetch == NULL) {
953                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
954                 talloc_free(ctdb_db);
955                 return -1;
956         }
957
958         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
959         if (ctdb_db->defer_dmaster == NULL) {
960                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
961                                   ctdb_db->db_name));
962                 talloc_free(ctdb_db);
963                 return -1;
964         }
965
966         DLIST_ADD(ctdb->db_list, ctdb_db);
967
968         /* setting this can help some high churn databases */
969         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
970
971         /* 
972            all databases support the "null" function. we need this in
973            order to do forced migration of records
974         */
975         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
976         if (ret != 0) {
977                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
978                 talloc_free(ctdb_db);
979                 return -1;
980         }
981
982         /* 
983            all databases support the "fetch" function. we need this
984            for efficient Samba3 ctdb fetch
985         */
986         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
987         if (ret != 0) {
988                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
989                 talloc_free(ctdb_db);
990                 return -1;
991         }
992
993         /* 
994            all databases support the "fetch_with_header" function. we need this
995            for efficient readonly record fetches
996         */
997         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
998         if (ret != 0) {
999                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1000                 talloc_free(ctdb_db);
1001                 return -1;
1002         }
1003
1004         ret = ctdb_vacuum_init(ctdb_db);
1005         if (ret != 0) {
1006                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1007                                   "database '%s'\n", ctdb_db->db_name));
1008                 talloc_free(ctdb_db);
1009                 return -1;
1010         }
1011
1012
1013         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1014                             ctdb_db->db_path, tdb_flags));
1015
1016         /* success */
1017         return 0;
1018 }
1019
1020
1021 struct ctdb_deferred_attach_context {
1022         struct ctdb_deferred_attach_context *next, *prev;
1023         struct ctdb_context *ctdb;
1024         struct ctdb_req_control *c;
1025 };
1026
1027
1028 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1029 {
1030         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1031
1032         return 0;
1033 }
1034
1035 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1036 {
1037         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1038         struct ctdb_context *ctdb = da_ctx->ctdb;
1039
1040         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1041         talloc_free(da_ctx);
1042 }
1043
1044 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1045 {
1046         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1047         struct ctdb_context *ctdb = da_ctx->ctdb;
1048
1049         /* This talloc-steals the packet ->c */
1050         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1051         talloc_free(da_ctx);
1052 }
1053
1054 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1055 {
1056         struct ctdb_deferred_attach_context *da_ctx;
1057
1058         /* call it from the main event loop as soon as the current event 
1059            finishes.
1060          */
1061         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1062                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1063                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1064         }
1065
1066         return 0;
1067 }
1068
1069 /*
1070   a client has asked to attach a new database
1071  */
1072 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1073                                TDB_DATA *outdata, uint64_t tdb_flags, 
1074                                bool persistent, uint32_t client_id,
1075                                struct ctdb_req_control *c,
1076                                bool *async_reply)
1077 {
1078         const char *db_name = (const char *)indata.dptr;
1079         struct ctdb_db_context *db;
1080         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1081         struct ctdb_client *client = NULL;
1082         bool with_jenkinshash, with_mutexes;
1083
1084         if (ctdb->tunable.allow_client_db_attach == 0) {
1085                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1086                                   "AllowClientDBAccess == 0\n", db_name));
1087                 return -1;
1088         }
1089
1090         /* dont allow any local clients to attach while we are in recovery mode
1091          * except for the recovery daemon.
1092          * allow all attach from the network since these are always from remote
1093          * recovery daemons.
1094          */
1095         if (client_id != 0) {
1096                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1097         }
1098         if (client != NULL) {
1099                 /* If the node is inactive it is not part of the cluster
1100                    and we should not allow clients to attach to any
1101                    databases
1102                 */
1103                 if (node->flags & NODE_FLAGS_INACTIVE) {
1104                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1105                         return -1;
1106                 }
1107
1108                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1109                     client->pid != ctdb->recoverd_pid &&
1110                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1111                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1112
1113                         if (da_ctx == NULL) {
1114                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1115                                 return -1;
1116                         }
1117
1118                         da_ctx->ctdb = ctdb;
1119                         da_ctx->c = talloc_steal(da_ctx, c);
1120                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1121                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1122
1123                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1124
1125                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1126                         *async_reply = true;
1127                         return 0;
1128                 }
1129         }
1130
1131         /* the client can optionally pass additional tdb flags, but we
1132            only allow a subset of those on the database in ctdb. Note
1133            that tdb_flags is passed in via the (otherwise unused)
1134            srvid to the attach control */
1135 #ifdef TDB_MUTEX_LOCKING
1136         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1137 #else
1138         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1139 #endif
1140
1141         /* see if we already have this name */
1142         db = ctdb_db_handle(ctdb, db_name);
1143         if (db) {
1144                 if (db->persistent != persistent) {
1145                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1146                                           "database %s\n", persistent ? "" : "non-",
1147                                           db-> persistent ? "" : "non-", db_name));
1148                         return -1;
1149                 }
1150                 outdata->dptr  = (uint8_t *)&db->db_id;
1151                 outdata->dsize = sizeof(db->db_id);
1152                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1153                 return 0;
1154         }
1155
1156         with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1157 #ifdef TDB_MUTEX_LOCKING
1158         with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1159 #else
1160         with_mutexes = false;
1161 #endif
1162
1163         if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1164                               with_jenkinshash, with_mutexes) != 0) {
1165                 return -1;
1166         }
1167
1168         db = ctdb_db_handle(ctdb, db_name);
1169         if (!db) {
1170                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1171                 return -1;
1172         }
1173
1174         /* remember the flags the client has specified */
1175         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1176
1177         outdata->dptr  = (uint8_t *)&db->db_id;
1178         outdata->dsize = sizeof(db->db_id);
1179
1180         /* Try to ensure it's locked in mem */
1181         lockdown_memory(ctdb->valgrinding);
1182
1183         /* tell all the other nodes about this database */
1184         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1185                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1186                                                 CTDB_CONTROL_DB_ATTACH,
1187                                  0, CTDB_CTRL_FLAG_NOREPLY,
1188                                  indata, NULL, NULL);
1189
1190         /* success */
1191         return 0;
1192 }
1193
1194 /*
1195  * a client has asked to detach from a database
1196  */
1197 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1198                                uint32_t client_id)
1199 {
1200         uint32_t db_id;
1201         struct ctdb_db_context *ctdb_db;
1202         struct ctdb_client *client = NULL;
1203
1204         db_id = *(uint32_t *)indata.dptr;
1205         ctdb_db = find_ctdb_db(ctdb, db_id);
1206         if (ctdb_db == NULL) {
1207                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1208                                   db_id));
1209                 return -1;
1210         }
1211
1212         if (ctdb->tunable.allow_client_db_attach == 1) {
1213                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1214                                   "Clients are allowed access to databases "
1215                                   "(AllowClientDBAccess == 1)\n",
1216                                   ctdb_db->db_name));
1217                 return -1;
1218         }
1219
1220         if (ctdb_db->persistent) {
1221                 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1222                                   "denied\n", ctdb_db->db_name));
1223                 return -1;
1224         }
1225
1226         /* Cannot detach from database when in recovery */
1227         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1228                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1229                 return -1;
1230         }
1231
1232         /* If a control comes from a client, then broadcast it to all nodes.
1233          * Do the actual detach only if the control comes from other daemons.
1234          */
1235         if (client_id != 0) {
1236                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1237                 if (client != NULL) {
1238                         /* forward the control to all the nodes */
1239                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1240                                                  CTDB_CONTROL_DB_DETACH, 0,
1241                                                  CTDB_CTRL_FLAG_NOREPLY,
1242                                                  indata, NULL, NULL);
1243                         return 0;
1244                 }
1245                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1246                                   "for database '%s'\n", ctdb_db->db_name));
1247                 return -1;
1248         }
1249
1250         /* Detach database from recoverd */
1251         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1252                                      CTDB_SRVID_DETACH_DATABASE,
1253                                      indata) != 0) {
1254                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1255                 return -1;
1256         }
1257
1258         /* Disable vacuuming and drop all vacuuming data */
1259         talloc_free(ctdb_db->vacuum_handle);
1260         talloc_free(ctdb_db->delete_queue);
1261
1262         /* Terminate any deferred fetch */
1263         talloc_free(ctdb_db->deferred_fetch);
1264
1265         /* Terminate any traverses */
1266         while (ctdb_db->traverse) {
1267                 talloc_free(ctdb_db->traverse);
1268         }
1269
1270         /* Terminate any revokes */
1271         while (ctdb_db->revokechild_active) {
1272                 talloc_free(ctdb_db->revokechild_active);
1273         }
1274
1275         /* Free readonly tracking database */
1276         if (ctdb_db->readonly) {
1277                 talloc_free(ctdb_db->rottdb);
1278         }
1279
1280         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1281
1282         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1283                              ctdb_db->db_name));
1284         talloc_free(ctdb_db);
1285
1286         return 0;
1287 }
1288
1289 /*
1290   attach to all existing persistent databases
1291  */
1292 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1293                                   const char *unhealthy_reason)
1294 {
1295         DIR *d;
1296         struct dirent *de;
1297
1298         /* open the persistent db directory and scan it for files */
1299         d = opendir(ctdb->db_directory_persistent);
1300         if (d == NULL) {
1301                 return 0;
1302         }
1303
1304         while ((de=readdir(d))) {
1305                 char *p, *s, *q;
1306                 size_t len = strlen(de->d_name);
1307                 uint32_t node;
1308                 int invalid_name = 0;
1309                 
1310                 s = talloc_strdup(ctdb, de->d_name);
1311                 if (s == NULL) {
1312                         closedir(d);
1313                         CTDB_NO_MEMORY(ctdb, s);
1314                 }
1315
1316                 /* only accept names ending in .tdb */
1317                 p = strstr(s, ".tdb.");
1318                 if (len < 7 || p == NULL) {
1319                         talloc_free(s);
1320                         continue;
1321                 }
1322
1323                 /* only accept names ending with .tdb. and any number of digits */
1324                 q = p+5;
1325                 while (*q != 0 && invalid_name == 0) {
1326                         if (!isdigit(*q++)) {
1327                                 invalid_name = 1;
1328                         }
1329                 }
1330                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1331                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1332                         talloc_free(s);
1333                         continue;
1334                 }
1335                 p[4] = 0;
1336
1337                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1338                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1339                         closedir(d);
1340                         talloc_free(s);
1341                         return -1;
1342                 }
1343
1344                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1345
1346                 talloc_free(s);
1347         }
1348         closedir(d);
1349         return 0;
1350 }
1351
1352 int ctdb_attach_databases(struct ctdb_context *ctdb)
1353 {
1354         int ret;
1355         char *persistent_health_path = NULL;
1356         char *unhealthy_reason = NULL;
1357         bool first_try = true;
1358
1359         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1360                                                  ctdb->db_directory_state,
1361                                                  PERSISTENT_HEALTH_TDB,
1362                                                  ctdb->pnn);
1363         if (persistent_health_path == NULL) {
1364                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1365                 return -1;
1366         }
1367
1368 again:
1369
1370         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1371                                                    0, TDB_DISALLOW_NESTING,
1372                                                    O_CREAT | O_RDWR, 0600);
1373         if (ctdb->db_persistent_health == NULL) {
1374                 struct tdb_wrap *tdb;
1375
1376                 if (!first_try) {
1377                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1378                                           persistent_health_path,
1379                                           errno,
1380                                           strerror(errno)));
1381                         talloc_free(persistent_health_path);
1382                         talloc_free(unhealthy_reason);
1383                         return -1;
1384                 }
1385                 first_try = false;
1386
1387                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1388                                                    persistent_health_path,
1389                                                    "was cleared after a failure",
1390                                                    "manual verification needed");
1391                 if (unhealthy_reason == NULL) {
1392                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1393                         talloc_free(persistent_health_path);
1394                         return -1;
1395                 }
1396
1397                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1398                                   persistent_health_path));
1399                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1400                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1401                                     O_CREAT | O_RDWR, 0600);
1402                 if (tdb) {
1403                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1404                                           persistent_health_path,
1405                                           errno,
1406                                           strerror(errno)));
1407                         talloc_free(persistent_health_path);
1408                         talloc_free(unhealthy_reason);
1409                         return -1;
1410                 }
1411
1412                 talloc_free(tdb);
1413                 goto again;
1414         }
1415         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1416         if (ret != 0) {
1417                 struct tdb_wrap *tdb;
1418
1419                 talloc_free(ctdb->db_persistent_health);
1420                 ctdb->db_persistent_health = NULL;
1421
1422                 if (!first_try) {
1423                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1424                                           persistent_health_path));
1425                         talloc_free(persistent_health_path);
1426                         talloc_free(unhealthy_reason);
1427                         return -1;
1428                 }
1429                 first_try = false;
1430
1431                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1432                                                    persistent_health_path,
1433                                                    "was cleared after a failure",
1434                                                    "manual verification needed");
1435                 if (unhealthy_reason == NULL) {
1436                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1437                         talloc_free(persistent_health_path);
1438                         return -1;
1439                 }
1440
1441                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1442                                   persistent_health_path));
1443                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1444                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1445                                     O_CREAT | O_RDWR, 0600);
1446                 if (tdb) {
1447                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1448                                           persistent_health_path,
1449                                           errno,
1450                                           strerror(errno)));
1451                         talloc_free(persistent_health_path);
1452                         talloc_free(unhealthy_reason);
1453                         return -1;
1454                 }
1455
1456                 talloc_free(tdb);
1457                 goto again;
1458         }
1459         talloc_free(persistent_health_path);
1460
1461         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1462         talloc_free(unhealthy_reason);
1463         if (ret != 0) {
1464                 return ret;
1465         }
1466
1467         return 0;
1468 }
1469
1470 /*
1471   called when a broadcast seqnum update comes in
1472  */
1473 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1474 {
1475         struct ctdb_db_context *ctdb_db;
1476         if (srcnode == ctdb->pnn) {
1477                 /* don't update ourselves! */
1478                 return 0;
1479         }
1480
1481         ctdb_db = find_ctdb_db(ctdb, db_id);
1482         if (!ctdb_db) {
1483                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1484                 return -1;
1485         }
1486
1487         if (ctdb_db->unhealthy_reason) {
1488                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1489                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1490                 return -1;
1491         }
1492
1493         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1494         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1495         return 0;
1496 }
1497
1498 /*
1499   timer to check for seqnum changes in a ltdb and propogate them
1500  */
1501 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1502                                    struct timeval t, void *p)
1503 {
1504         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1505         struct ctdb_context *ctdb = ctdb_db->ctdb;
1506         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1507         if (new_seqnum != ctdb_db->seqnum) {
1508                 /* something has changed - propogate it */
1509                 TDB_DATA data;
1510                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1511                 data.dsize = sizeof(uint32_t);
1512                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1513                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1514                                          data, NULL, NULL);             
1515         }
1516         ctdb_db->seqnum = new_seqnum;
1517
1518         /* setup a new timer */
1519         ctdb_db->seqnum_update =
1520                 event_add_timed(ctdb->ev, ctdb_db, 
1521                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1522                                 ctdb_ltdb_seqnum_check, ctdb_db);
1523 }
1524
1525 /*
1526   enable seqnum handling on this db
1527  */
1528 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1529 {
1530         struct ctdb_db_context *ctdb_db;
1531         ctdb_db = find_ctdb_db(ctdb, db_id);
1532         if (!ctdb_db) {
1533                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1534                 return -1;
1535         }
1536
1537         if (ctdb_db->seqnum_update == NULL) {
1538                 ctdb_db->seqnum_update =
1539                         event_add_timed(ctdb->ev, ctdb_db, 
1540                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1541                                         ctdb_ltdb_seqnum_check, ctdb_db);
1542         }
1543
1544         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1545         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1546         return 0;
1547 }
1548
1549 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
1550                                      uint32_t client_id)
1551 {
1552         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1553         struct ctdb_db_context *ctdb_db;
1554
1555         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1556         if (!ctdb_db) {
1557                 if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
1558                         DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
1559                                          db_prio->db_id));
1560                 }
1561                 return 0;
1562         }
1563
1564         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1565                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1566                 return 0;
1567         }
1568
1569         ctdb_db->priority = db_prio->priority;
1570         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1571
1572         if (client_id != 0) {
1573                 /* Broadcast the update to the rest of the cluster */
1574                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1575                                          CTDB_CONTROL_SET_DB_PRIORITY, 0,
1576                                          CTDB_CTRL_FLAG_NOREPLY, indata,
1577                                          NULL, NULL);
1578         }
1579         return 0;
1580 }
1581
1582
1583 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1584 {
1585         if (ctdb_db->sticky) {
1586                 return 0;
1587         }
1588
1589         if (ctdb_db->persistent) {
1590                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1591                 return -1;
1592         }
1593
1594         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1595
1596         ctdb_db->sticky = true;
1597
1598         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1599
1600         return 0;
1601 }
1602
1603 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1604                                 uint32_t db_id,
1605                                 TDB_DATA *outdata)
1606 {
1607         struct ctdb_db_context *ctdb_db;
1608         struct ctdb_db_statistics *stats;
1609         int i;
1610         int len;
1611         char *ptr;
1612
1613         ctdb_db = find_ctdb_db(ctdb, db_id);
1614         if (!ctdb_db) {
1615                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1616                 return -1;
1617         }
1618
1619         len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
1620         for (i = 0; i < MAX_HOT_KEYS; i++) {
1621                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1622         }
1623
1624         stats = talloc_size(outdata, len);
1625         if (stats == NULL) {
1626                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1627                 return -1;
1628         }
1629
1630         *stats = ctdb_db->statistics;
1631
1632         stats->num_hot_keys = MAX_HOT_KEYS;
1633
1634         ptr = &stats->hot_keys_wire[0];
1635         for (i = 0; i < MAX_HOT_KEYS; i++) {
1636                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1637                        ctdb_db->statistics.hot_keys[i].key.dsize);
1638                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1639         }
1640
1641         outdata->dptr  = (uint8_t *)stats;
1642         outdata->dsize = len;
1643
1644         return 0;
1645 }