iIt is better to plainly disallow clietnts from connecting here
[metze/ctdb/wip.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include <ctype.h>
30
31 /*
32   this is the dummy null procedure that all databases support
33 */
34 static int ctdb_null_func(struct ctdb_call_info *call)
35 {
36         return 0;
37 }
38
39 /*
40   this is a plain fetch procedure that all databases support
41 */
42 static int ctdb_fetch_func(struct ctdb_call_info *call)
43 {
44         call->reply_data = &call->record_data;
45         return 0;
46 }
47
48
49
50 struct lock_fetch_state {
51         struct ctdb_context *ctdb;
52         void (*recv_pkt)(void *, struct ctdb_req_header *);
53         void *recv_context;
54         struct ctdb_req_header *hdr;
55         uint32_t generation;
56         bool ignore_generation;
57 };
58
59 /*
60   called when we should retry the operation
61  */
62 static void lock_fetch_callback(void *p)
63 {
64         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
65         if (!state->ignore_generation &&
66             state->generation != state->ctdb->vnn_map->generation) {
67                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
68                 talloc_free(state->hdr);
69                 return;
70         }
71         state->recv_pkt(state->recv_context, state->hdr);
72         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
73 }
74
75
76 /*
77   do a non-blocking ltdb_lock, deferring this ctdb request until we
78   have the chainlock
79
80   It does the following:
81
82    1) tries to get the chainlock. If it succeeds, then it returns 0
83
84    2) if it fails to get a chainlock immediately then it sets up a
85    non-blocking chainlock via ctdb_lockwait, and when it gets the
86    chainlock it re-submits this ctdb request to the main packet
87    receive function
88
89    This effectively queues all ctdb requests that cannot be
90    immediately satisfied until it can get the lock. This means that
91    the main ctdb daemon will not block waiting for a chainlock held by
92    a client
93
94    There are 3 possible return values:
95
96        0:    means that it got the lock immediately.
97       -1:    means that it failed to get the lock, and won't retry
98       -2:    means that it failed to get the lock immediately, but will retry
99  */
100 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
101                            TDB_DATA key, struct ctdb_req_header *hdr,
102                            void (*recv_pkt)(void *, struct ctdb_req_header *),
103                            void *recv_context, bool ignore_generation)
104 {
105         int ret;
106         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
107         struct lockwait_handle *h;
108         struct lock_fetch_state *state;
109         
110         ret = tdb_chainlock_nonblock(tdb, key);
111
112         if (ret != 0 &&
113             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
114                 /* a hard failure - don't try again */
115                 return -1;
116         }
117
118         /* when torturing, ensure we test the contended path */
119         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
120             random() % 5 == 0) {
121                 ret = -1;
122                 tdb_chainunlock(tdb, key);
123         }
124
125         /* first the non-contended path */
126         if (ret == 0) {
127                 return 0;
128         }
129
130         state = talloc(hdr, struct lock_fetch_state);
131         state->ctdb = ctdb_db->ctdb;
132         state->hdr = hdr;
133         state->recv_pkt = recv_pkt;
134         state->recv_context = recv_context;
135         state->generation = ctdb_db->ctdb->vnn_map->generation;
136         state->ignore_generation = ignore_generation;
137
138         /* now the contended path */
139         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
140         if (h == NULL) {
141                 tdb_chainunlock(tdb, key);
142                 return -1;
143         }
144
145         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
146            so it won't be freed yet */
147         talloc_steal(state, hdr);
148         talloc_steal(state, h);
149
150         /* now tell the caller than we will retry asynchronously */
151         return -2;
152 }
153
154 /*
155   a varient of ctdb_ltdb_lock_requeue that also fetches the record
156  */
157 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
158                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
159                                  struct ctdb_req_header *hdr, TDB_DATA *data,
160                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
161                                  void *recv_context, bool ignore_generation)
162 {
163         int ret;
164
165         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
166                                      recv_context, ignore_generation);
167         if (ret == 0) {
168                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
169                 if (ret != 0) {
170                         ctdb_ltdb_unlock(ctdb_db, key);
171                 }
172         }
173         return ret;
174 }
175
176
177 /*
178   paraoid check to see if the db is empty
179  */
180 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
181 {
182         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
183         int count = tdb_traverse_read(tdb, NULL, NULL);
184         if (count != 0) {
185                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
186                          ctdb_db->db_path));
187                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
188         }
189 }
190
191
192 /*
193   attach to a database, handling both persistent and non-persistent databases
194   return 0 on success, -1 on failure
195  */
196 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, bool persistent)
197 {
198         struct ctdb_db_context *ctdb_db, *tmp_db;
199         int ret;
200         struct TDB_DATA key;
201         unsigned tdb_flags;
202
203         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
204         CTDB_NO_MEMORY(ctdb, ctdb_db);
205
206         ctdb_db->priority = 1;
207         ctdb_db->ctdb = ctdb;
208         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
209         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
210
211         key.dsize = strlen(db_name)+1;
212         key.dptr  = discard_const(db_name);
213         ctdb_db->db_id = ctdb_hash(&key);
214         ctdb_db->persistent = persistent;
215
216         /* check for hash collisions */
217         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
218                 if (tmp_db->db_id == ctdb_db->db_id) {
219                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
220                                  tmp_db->db_id, db_name, tmp_db->db_name));
221                         talloc_free(ctdb_db);
222                         return -1;
223                 }
224         }
225
226         if (ctdb->db_directory == NULL) {
227                 ctdb->db_directory = VARDIR "/ctdb";
228         }
229
230         /* make sure the db directory exists */
231         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
232                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n", 
233                          ctdb->db_directory));
234                 talloc_free(ctdb_db);
235                 return -1;
236         }
237
238         if (persistent && mkdir(ctdb->db_directory_persistent, 0700) == -1 && errno != EEXIST) {
239                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n", 
240                          ctdb->db_directory_persistent));
241                 talloc_free(ctdb_db);
242                 return -1;
243         }
244
245         /* open the database */
246         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
247                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
248                                            db_name, ctdb->pnn);
249
250         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
251         if (!ctdb->do_setsched) {
252                 tdb_flags |= TDB_NOMMAP;
253         }
254
255         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
256                                       ctdb->tunable.database_hash_size, 
257                                       tdb_flags, 
258                                       O_CREAT|O_RDWR, 0666);
259         if (ctdb_db->ltdb == NULL) {
260                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s'\n", ctdb_db->db_path));
261                 talloc_free(ctdb_db);
262                 return -1;
263         }
264
265         if (!persistent) {
266                 ctdb_check_db_empty(ctdb_db);
267         }
268
269         DLIST_ADD(ctdb->db_list, ctdb_db);
270
271         /* setting this can help some high churn databases */
272         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
273
274         /* 
275            all databases support the "null" function. we need this in
276            order to do forced migration of records
277         */
278         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
279         if (ret != 0) {
280                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
281                 talloc_free(ctdb_db);
282                 return -1;
283         }
284
285         /* 
286            all databases support the "fetch" function. we need this
287            for efficient Samba3 ctdb fetch
288         */
289         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
290         if (ret != 0) {
291                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
292                 talloc_free(ctdb_db);
293                 return -1;
294         }
295
296         ret = ctdb_vacuum_init(ctdb_db);
297         if (ret != 0) {
298                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
299                                   "database '%s'\n", ctdb_db->db_name));
300                 talloc_free(ctdb_db);
301                 return -1;
302         }
303
304
305         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
306         
307         /* success */
308         return 0;
309 }
310
311
312 /*
313   a client has asked to attach a new database
314  */
315 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
316                                TDB_DATA *outdata, uint64_t tdb_flags, 
317                                bool persistent)
318 {
319         const char *db_name = (const char *)indata.dptr;
320         struct ctdb_db_context *db;
321         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
322
323         /* the client can optionally pass additional tdb flags, but we
324            only allow a subset of those on the database in ctdb. Note
325            that tdb_flags is passed in via the (otherwise unused)
326            srvid to the attach control */
327         tdb_flags &= TDB_NOSYNC;
328
329         /* If the node is inactive it is not part of the cluster
330            and we should not allow clients to attach to any
331            databases
332         */
333         if (node->flags & NODE_FLAGS_INACTIVE) {
334                 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
335                 return -1;
336         }
337
338
339         /* see if we already have this name */
340         db = ctdb_db_handle(ctdb, db_name);
341         if (db) {
342                 outdata->dptr  = (uint8_t *)&db->db_id;
343                 outdata->dsize = sizeof(db->db_id);
344                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
345                 return 0;
346         }
347
348         if (ctdb_local_attach(ctdb, db_name, persistent) != 0) {
349                 return -1;
350         }
351
352         db = ctdb_db_handle(ctdb, db_name);
353         if (!db) {
354                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
355                 return -1;
356         }
357
358         /* remember the flags the client has specified */
359         tdb_add_flags(db->ltdb->tdb, tdb_flags);
360
361         outdata->dptr  = (uint8_t *)&db->db_id;
362         outdata->dsize = sizeof(db->db_id);
363
364         /* tell all the other nodes about this database */
365         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
366                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
367                                                 CTDB_CONTROL_DB_ATTACH,
368                                  0, CTDB_CTRL_FLAG_NOREPLY,
369                                  indata, NULL, NULL);
370
371         /* success */
372         return 0;
373 }
374
375
376 /*
377   attach to all existing persistent databases
378  */
379 int ctdb_attach_persistent(struct ctdb_context *ctdb)
380 {
381         DIR *d;
382         struct dirent *de;
383
384         /* open the persistent db directory and scan it for files */
385         d = opendir(ctdb->db_directory_persistent);
386         if (d == NULL) {
387                 return 0;
388         }
389
390         while ((de=readdir(d))) {
391                 char *p, *s, *q;
392                 size_t len = strlen(de->d_name);
393                 uint32_t node;
394                 int invalid_name = 0;
395                 
396                 s = talloc_strdup(ctdb, de->d_name);
397                 CTDB_NO_MEMORY(ctdb, s);
398
399                 /* only accept names ending in .tdb */
400                 p = strstr(s, ".tdb.");
401                 if (len < 7 || p == NULL) {
402                         talloc_free(s);
403                         continue;
404                 }
405
406                 /* only accept names ending with .tdb. and any number of digits */
407                 q = p+5;
408                 while (*q != 0 && invalid_name == 0) {
409                         if (!isdigit(*q++)) {
410                                 invalid_name = 1;
411                         }
412                 }
413                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
414                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
415                         talloc_free(s);
416                         continue;
417                 }
418                 p[4] = 0;
419
420                 if (ctdb_local_attach(ctdb, s, true) != 0) {
421                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
422                         closedir(d);
423                         talloc_free(s);
424                         return -1;
425                 }
426                 DEBUG(DEBUG_NOTICE,("Attached to persistent database %s\n", s));
427
428                 talloc_free(s);
429         }
430         closedir(d);
431         return 0;
432 }
433
434 /*
435   called when a broadcast seqnum update comes in
436  */
437 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
438 {
439         struct ctdb_db_context *ctdb_db;
440         if (srcnode == ctdb->pnn) {
441                 /* don't update ourselves! */
442                 return 0;
443         }
444
445         ctdb_db = find_ctdb_db(ctdb, db_id);
446         if (!ctdb_db) {
447                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
448                 return -1;
449         }
450
451         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
452         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
453         return 0;
454 }
455
456 /*
457   timer to check for seqnum changes in a ltdb and propogate them
458  */
459 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
460                                    struct timeval t, void *p)
461 {
462         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
463         struct ctdb_context *ctdb = ctdb_db->ctdb;
464         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
465         if (new_seqnum != ctdb_db->seqnum) {
466                 /* something has changed - propogate it */
467                 TDB_DATA data;
468                 data.dptr = (uint8_t *)&ctdb_db->db_id;
469                 data.dsize = sizeof(uint32_t);
470                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
471                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
472                                          data, NULL, NULL);             
473         }
474         ctdb_db->seqnum = new_seqnum;
475
476         /* setup a new timer */
477         ctdb_db->te = 
478                 event_add_timed(ctdb->ev, ctdb_db, 
479                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
480                                 ctdb_ltdb_seqnum_check, ctdb_db);
481 }
482
483 /*
484   enable seqnum handling on this db
485  */
486 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
487 {
488         struct ctdb_db_context *ctdb_db;
489         ctdb_db = find_ctdb_db(ctdb, db_id);
490         if (!ctdb_db) {
491                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
492                 return -1;
493         }
494
495         if (ctdb_db->te == NULL) {
496                 ctdb_db->te = 
497                         event_add_timed(ctdb->ev, ctdb_db, 
498                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
499                                         ctdb_ltdb_seqnum_check, ctdb_db);
500         }
501
502         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
503         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
504         return 0;
505 }
506
507 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
508 {
509         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
510         struct ctdb_db_context *ctdb_db;
511
512         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
513         if (!ctdb_db) {
514                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
515                 return -1;
516         }
517
518         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
519                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
520                 return -1;
521         }
522
523         ctdb_db->priority = db_prio->priority;
524         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
525
526         return 0;
527 }
528
529