a new tunable DatabaseMaxDead that enables the tdb max dead cache logic
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   this is the dummy null procedure that all databases support
32 */
33 static int ctdb_null_func(struct ctdb_call_info *call)
34 {
35         return 0;
36 }
37
38 /*
39   this is a plain fetch procedure that all databases support
40 */
41 static int ctdb_fetch_func(struct ctdb_call_info *call)
42 {
43         call->reply_data = &call->record_data;
44         return 0;
45 }
46
47
48
49 struct lock_fetch_state {
50         struct ctdb_context *ctdb;
51         void (*recv_pkt)(void *, struct ctdb_req_header *);
52         void *recv_context;
53         struct ctdb_req_header *hdr;
54         uint32_t generation;
55         bool ignore_generation;
56 };
57
58 /*
59   called when we should retry the operation
60  */
61 static void lock_fetch_callback(void *p)
62 {
63         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
64         if (!state->ignore_generation &&
65             state->generation != state->ctdb->vnn_map->generation) {
66                 DEBUG(0,("Discarding previous generation lockwait packet\n"));
67                 talloc_free(state->hdr);
68                 return;
69         }
70         state->recv_pkt(state->recv_context, state->hdr);
71         DEBUG(2,(__location__ " PACKET REQUEUED\n"));
72 }
73
74
75 /*
76   do a non-blocking ltdb_lock, deferring this ctdb request until we
77   have the chainlock
78
79   It does the following:
80
81    1) tries to get the chainlock. If it succeeds, then it returns 0
82
83    2) if it fails to get a chainlock immediately then it sets up a
84    non-blocking chainlock via ctdb_lockwait, and when it gets the
85    chainlock it re-submits this ctdb request to the main packet
86    receive function
87
88    This effectively queues all ctdb requests that cannot be
89    immediately satisfied until it can get the lock. This means that
90    the main ctdb daemon will not block waiting for a chainlock held by
91    a client
92
93    There are 3 possible return values:
94
95        0:    means that it got the lock immediately.
96       -1:    means that it failed to get the lock, and won't retry
97       -2:    means that it failed to get the lock immediately, but will retry
98  */
99 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
100                            TDB_DATA key, struct ctdb_req_header *hdr,
101                            void (*recv_pkt)(void *, struct ctdb_req_header *),
102                            void *recv_context, bool ignore_generation)
103 {
104         int ret;
105         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
106         struct lockwait_handle *h;
107         struct lock_fetch_state *state;
108         
109         ret = tdb_chainlock_nonblock(tdb, key);
110
111         if (ret != 0 &&
112             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
113                 /* a hard failure - don't try again */
114                 return -1;
115         }
116
117         /* when torturing, ensure we test the contended path */
118         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
119             random() % 5 == 0) {
120                 ret = -1;
121                 tdb_chainunlock(tdb, key);
122         }
123
124         /* first the non-contended path */
125         if (ret == 0) {
126                 return 0;
127         }
128
129         state = talloc(hdr, struct lock_fetch_state);
130         state->ctdb = ctdb_db->ctdb;
131         state->hdr = hdr;
132         state->recv_pkt = recv_pkt;
133         state->recv_context = recv_context;
134         state->generation = ctdb_db->ctdb->vnn_map->generation;
135         state->ignore_generation = ignore_generation;
136
137         /* now the contended path */
138         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
139         if (h == NULL) {
140                 tdb_chainunlock(tdb, key);
141                 return -1;
142         }
143
144         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
145            so it won't be freed yet */
146         talloc_steal(state, hdr);
147         talloc_steal(state, h);
148
149         /* now tell the caller than we will retry asynchronously */
150         return -2;
151 }
152
153 /*
154   a varient of ctdb_ltdb_lock_requeue that also fetches the record
155  */
156 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
157                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
158                                  struct ctdb_req_header *hdr, TDB_DATA *data,
159                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
160                                  void *recv_context, bool ignore_generation)
161 {
162         int ret;
163
164         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
165                                      recv_context, ignore_generation);
166         if (ret == 0) {
167                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
168                 if (ret != 0) {
169                         ctdb_ltdb_unlock(ctdb_db, key);
170                 }
171         }
172         return ret;
173 }
174
175
176 /*
177   paraoid check to see if the db is empty
178  */
179 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
180 {
181         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
182         int count = tdb_traverse_read(tdb, NULL, NULL);
183         if (count != 0) {
184                 DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
185                          ctdb_db->db_path));
186                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
187         }
188 }
189
190
191 /*
192   attach to a database, handling both persistent and non-persistent databases
193   return 0 on success, -1 on failure
194  */
195 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, bool persistent)
196 {
197         struct ctdb_db_context *ctdb_db, *tmp_db;
198         int ret;
199         struct TDB_DATA key;
200
201         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
202         CTDB_NO_MEMORY(ctdb, ctdb_db);
203
204         ctdb_db->ctdb = ctdb;
205         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
206         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
207
208         key.dsize = strlen(db_name)+1;
209         key.dptr  = discard_const(db_name);
210         ctdb_db->db_id = ctdb_hash(&key);
211         ctdb_db->persistent = persistent;
212
213         /* check for hash collisions */
214         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
215                 if (tmp_db->db_id == ctdb_db->db_id) {
216                         DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
217                                  tmp_db->db_id, db_name, tmp_db->db_name));
218                         talloc_free(ctdb_db);
219                         return -1;
220                 }
221         }
222
223         if (ctdb->db_directory == NULL) {
224                 ctdb->db_directory = VARDIR "/ctdb";
225         }
226
227         /* make sure the db directory exists */
228         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
229                 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
230                          ctdb->db_directory));
231                 talloc_free(ctdb_db);
232                 return -1;
233         }
234
235         if (persistent && mkdir(ctdb->db_directory_persistent, 0700) == -1 && errno != EEXIST) {
236                 DEBUG(0,(__location__ " Unable to create ctdb persistent directory '%s'\n", 
237                          ctdb->db_directory_persistent));
238                 talloc_free(ctdb_db);
239                 return -1;
240         }
241
242         /* open the database */
243         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
244                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
245                                            db_name, ctdb->pnn);
246
247         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
248                                       ctdb->tunable.database_hash_size, 
249                                       persistent?TDB_DEFAULT:TDB_CLEAR_IF_FIRST, 
250                                       O_CREAT|O_RDWR, 0666);
251         if (ctdb_db->ltdb == NULL) {
252                 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
253                 talloc_free(ctdb_db);
254                 return -1;
255         }
256
257         if (!persistent) {
258                 ctdb_check_db_empty(ctdb_db);
259         }
260
261         DLIST_ADD(ctdb->db_list, ctdb_db);
262
263         /* setting this can help some high churn databases */
264         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
265
266         /* 
267            all databases support the "null" function. we need this in
268            order to do forced migration of records
269         */
270         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
271         if (ret != 0) {
272                 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
273                 talloc_free(ctdb_db);
274                 return -1;
275         }
276
277         /* 
278            all databases support the "fetch" function. we need this
279            for efficient Samba3 ctdb fetch
280         */
281         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
282         if (ret != 0) {
283                 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
284                 talloc_free(ctdb_db);
285                 return -1;
286         }
287
288         DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
289         
290         /* success */
291         return 0;
292 }
293
294
295 /*
296   a client has asked to attach a new database
297  */
298 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
299                                TDB_DATA *outdata, bool persistent)
300 {
301         const char *db_name = (const char *)indata.dptr;
302         struct ctdb_db_context *db;
303         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
304
305         /* If the node is inactive it is not part of the cluster
306            and we should not allow clients to attach to any
307            databases
308         */
309         if (node->flags & NODE_FLAGS_INACTIVE) {
310                 DEBUG(0,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
311                 return -1;
312         }
313
314
315         /* see if we already have this name */
316         db = ctdb_db_handle(ctdb, db_name);
317         if (db) {
318                 outdata->dptr  = (uint8_t *)&db->db_id;
319                 outdata->dsize = sizeof(db->db_id);
320                 return 0;
321         }
322
323         if (ctdb_local_attach(ctdb, db_name, persistent) != 0) {
324                 return -1;
325         }
326
327         db = ctdb_db_handle(ctdb, db_name);
328         if (!db) {
329                 DEBUG(0,("Failed to find db handle for name '%s'\n", db_name));
330                 return -1;
331         }
332
333         outdata->dptr  = (uint8_t *)&db->db_id;
334         outdata->dsize = sizeof(db->db_id);
335
336         /* tell all the other nodes about this database */
337         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
338                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
339                                                 CTDB_CONTROL_DB_ATTACH,
340                                  0, CTDB_CTRL_FLAG_NOREPLY,
341                                  indata, NULL, NULL);
342
343         /* success */
344         return 0;
345 }
346
347
348 /*
349   attach to all existing persistent databases
350  */
351 int ctdb_attach_persistent(struct ctdb_context *ctdb)
352 {
353         DIR *d;
354         struct dirent *de;
355
356         /* open the persistent db directory and scan it for files */
357         d = opendir(ctdb->db_directory_persistent);
358         if (d == NULL) {
359                 return 0;
360         }
361
362         while ((de=readdir(d))) {
363                 char *p, *s;
364                 size_t len = strlen(de->d_name);
365                 uint32_t node;
366                 
367                 s = talloc_strdup(ctdb, de->d_name);
368                 CTDB_NO_MEMORY(ctdb, s);
369
370                 /* only accept names ending in .tdb */
371                 p = strstr(s, ".tdb.");
372                 if (len < 7 || p == NULL) {
373                         talloc_free(s);
374                         continue;
375                 }
376                 if (sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
377                         talloc_free(s);
378                         continue;
379                 }
380                 p[4] = 0;
381
382                 if (ctdb_local_attach(ctdb, s, true) != 0) {
383                         DEBUG(0,("Failed to attach to persistent database '%s'\n", de->d_name));
384                         closedir(d);
385                         talloc_free(s);
386                         return -1;
387                 }
388                 DEBUG(0,("Attached to persistent database %s\n", s));
389
390                 talloc_free(s);
391         }
392         closedir(d);
393         return 0;
394 }
395
396 /*
397   called when a broadcast seqnum update comes in
398  */
399 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
400 {
401         struct ctdb_db_context *ctdb_db;
402         if (srcnode == ctdb->pnn) {
403                 /* don't update ourselves! */
404                 return 0;
405         }
406
407         ctdb_db = find_ctdb_db(ctdb, db_id);
408         if (!ctdb_db) {
409                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
410                 return -1;
411         }
412
413         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
414         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
415         return 0;
416 }
417
418 /*
419   timer to check for seqnum changes in a ltdb and propogate them
420  */
421 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
422                                    struct timeval t, void *p)
423 {
424         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
425         struct ctdb_context *ctdb = ctdb_db->ctdb;
426         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
427         if (new_seqnum != ctdb_db->seqnum) {
428                 /* something has changed - propogate it */
429                 TDB_DATA data;
430                 data.dptr = (uint8_t *)&ctdb_db->db_id;
431                 data.dsize = sizeof(uint32_t);
432                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
433                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
434                                          data, NULL, NULL);             
435         }
436         ctdb_db->seqnum = new_seqnum;
437
438         /* setup a new timer */
439         ctdb_db->te = 
440                 event_add_timed(ctdb->ev, ctdb_db, 
441                                 timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
442                                 ctdb_ltdb_seqnum_check, ctdb_db);
443 }
444
445 /*
446   enable seqnum handling on this db
447  */
448 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
449 {
450         struct ctdb_db_context *ctdb_db;
451         ctdb_db = find_ctdb_db(ctdb, db_id);
452         if (!ctdb_db) {
453                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
454                 return -1;
455         }
456
457         if (ctdb_db->te == NULL) {
458                 ctdb_db->te = 
459                         event_add_timed(ctdb->ev, ctdb_db, 
460                                         timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
461                                         ctdb_ltdb_seqnum_check, ctdb_db);
462         }
463
464         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
465         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
466         return 0;
467 }
468