merged patch from tridge
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "../include/ctdb_private.h"
26 #include "db_wrap.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30   this is the dummy null procedure that all databases support
31 */
32 static int ctdb_null_func(struct ctdb_call_info *call)
33 {
34         return 0;
35 }
36
37 /*
38   this is a plain fetch procedure that all databases support
39 */
40 static int ctdb_fetch_func(struct ctdb_call_info *call)
41 {
42         call->reply_data = &call->record_data;
43         return 0;
44 }
45
46
47
48 struct lock_fetch_state {
49         struct ctdb_context *ctdb;
50         void (*recv_pkt)(void *, struct ctdb_req_header *);
51         void *recv_context;
52         struct ctdb_req_header *hdr;
53         uint32_t generation;
54         bool ignore_generation;
55 };
56
57 /*
58   called when we should retry the operation
59  */
60 static void lock_fetch_callback(void *p)
61 {
62         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
63         if (!state->ignore_generation &&
64             state->generation != state->ctdb->vnn_map->generation) {
65                 DEBUG(0,("Discarding previous generation lockwait packet\n"));
66                 talloc_free(state->hdr);
67                 return;
68         }
69         state->recv_pkt(state->recv_context, state->hdr);
70         DEBUG(2,(__location__ " PACKET REQUEUED\n"));
71 }
72
73
74 /*
75   do a non-blocking ltdb_lock, deferring this ctdb request until we
76   have the chainlock
77
78   It does the following:
79
80    1) tries to get the chainlock. If it succeeds, then it returns 0
81
82    2) if it fails to get a chainlock immediately then it sets up a
83    non-blocking chainlock via ctdb_lockwait, and when it gets the
84    chainlock it re-submits this ctdb request to the main packet
85    receive function
86
87    This effectively queues all ctdb requests that cannot be
88    immediately satisfied until it can get the lock. This means that
89    the main ctdb daemon will not block waiting for a chainlock held by
90    a client
91
92    There are 3 possible return values:
93
94        0:    means that it got the lock immediately.
95       -1:    means that it failed to get the lock, and won't retry
96       -2:    means that it failed to get the lock immediately, but will retry
97  */
98 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
99                            TDB_DATA key, struct ctdb_req_header *hdr,
100                            void (*recv_pkt)(void *, struct ctdb_req_header *),
101                            void *recv_context, bool ignore_generation)
102 {
103         int ret;
104         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
105         struct lockwait_handle *h;
106         struct lock_fetch_state *state;
107         
108         ret = tdb_chainlock_nonblock(tdb, key);
109
110         if (ret != 0 &&
111             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
112                 /* a hard failure - don't try again */
113                 return -1;
114         }
115
116         /* when torturing, ensure we test the contended path */
117         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
118             random() % 5 == 0) {
119                 ret = -1;
120                 tdb_chainunlock(tdb, key);
121         }
122
123         /* first the non-contended path */
124         if (ret == 0) {
125                 return 0;
126         }
127
128         state = talloc(hdr, struct lock_fetch_state);
129         state->ctdb = ctdb_db->ctdb;
130         state->hdr = hdr;
131         state->recv_pkt = recv_pkt;
132         state->recv_context = recv_context;
133         state->generation = ctdb_db->ctdb->vnn_map->generation;
134         state->ignore_generation = ignore_generation;
135
136         /* now the contended path */
137         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
138         if (h == NULL) {
139                 tdb_chainunlock(tdb, key);
140                 return -1;
141         }
142
143         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
144            so it won't be freed yet */
145         talloc_steal(state, hdr);
146         talloc_steal(state, h);
147
148         /* now tell the caller than we will retry asynchronously */
149         return -2;
150 }
151
152 /*
153   a varient of ctdb_ltdb_lock_requeue that also fetches the record
154  */
155 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
156                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
157                                  struct ctdb_req_header *hdr, TDB_DATA *data,
158                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
159                                  void *recv_context, bool ignore_generation)
160 {
161         int ret;
162
163         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
164                                      recv_context, ignore_generation);
165         if (ret == 0) {
166                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
167                 if (ret != 0) {
168                         ctdb_ltdb_unlock(ctdb_db, key);
169                 }
170         }
171         return ret;
172 }
173
174
175 /*
176   paraoid check to see if the db is empty
177  */
178 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
179 {
180         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
181         int count = tdb_traverse_read(tdb, NULL, NULL);
182         if (count != 0) {
183                 DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
184                          ctdb_db->db_path));
185                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
186         }
187 }
188
189 /*
190   a client has asked to attach a new database
191  */
192 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
193                                TDB_DATA *outdata)
194 {
195         const char *db_name = (const char *)indata.dptr;
196         struct ctdb_db_context *ctdb_db, *tmp_db;
197         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
198         int ret;
199
200         /* If the node is inactive it is not part of the cluster
201            and we should not allow clients to attach to any
202            databases
203         */
204         if (node->flags & NODE_FLAGS_INACTIVE) {
205                 DEBUG(0,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
206                 return -1;
207         }
208
209
210         /* see if we already have this name */
211         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
212                 if (strcmp(db_name, tmp_db->db_name) == 0) {
213                         /* this is not an error */
214                         outdata->dptr  = (uint8_t *)&tmp_db->db_id;
215                         outdata->dsize = sizeof(tmp_db->db_id);
216                         return 0;
217                 }
218         }
219
220         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
221         CTDB_NO_MEMORY(ctdb, ctdb_db);
222
223         ctdb_db->ctdb = ctdb;
224         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
225         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
226
227         ctdb_db->db_id = ctdb_hash(&indata);
228
229         outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
230         outdata->dsize = sizeof(ctdb_db->db_id);
231
232         /* check for hash collisions */
233         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
234                 if (tmp_db->db_id == ctdb_db->db_id) {
235                         DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
236                                  tmp_db->db_id, db_name, tmp_db->db_name));
237                         talloc_free(ctdb_db);
238                         return -1;
239                 }
240         }
241
242         if (ctdb->db_directory == NULL) {
243                 ctdb->db_directory = VARDIR "/ctdb";
244         }
245
246         /* make sure the db directory exists */
247         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
248                 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
249                          ctdb->db_directory));
250                 talloc_free(ctdb_db);
251                 return -1;
252         }
253
254         /* open the database */
255         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
256                                            ctdb->db_directory, 
257                                            db_name, ctdb->pnn);
258
259         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
260                                       ctdb->tunable.database_hash_size, 
261                                       TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
262         if (ctdb_db->ltdb == NULL) {
263                 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
264                 talloc_free(ctdb_db);
265                 return -1;
266         }
267
268         ctdb_check_db_empty(ctdb_db);
269
270         DLIST_ADD(ctdb->db_list, ctdb_db);
271
272         /* 
273            all databases support the "null" function. we need this in
274            order to do forced migration of records
275         */
276         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
277         if (ret != 0) {
278                 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
279                 talloc_free(ctdb_db);
280                 return -1;
281         }
282
283         /* 
284            all databases support the "fetch" function. we need this
285            for efficient Samba3 ctdb fetch
286         */
287         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
288         if (ret != 0) {
289                 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
290                 talloc_free(ctdb_db);
291                 return -1;
292         }
293         
294         /* tell all the other nodes about this database */
295         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
296                                  CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
297                                  indata, NULL, NULL);
298
299         DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
300
301         /* success */
302         return 0;
303 }
304
305 /*
306   called when a broadcast seqnum update comes in
307  */
308 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
309 {
310         struct ctdb_db_context *ctdb_db;
311         if (srcnode == ctdb->pnn) {
312                 /* don't update ourselves! */
313                 return 0;
314         }
315
316         ctdb_db = find_ctdb_db(ctdb, db_id);
317         if (!ctdb_db) {
318                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
319                 return -1;
320         }
321
322         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
323         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
324         return 0;
325 }
326
327 /*
328   timer to check for seqnum changes in a ltdb and propogate them
329  */
330 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
331                                    struct timeval t, void *p)
332 {
333         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
334         struct ctdb_context *ctdb = ctdb_db->ctdb;
335         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
336         if (new_seqnum != ctdb_db->seqnum) {
337                 /* something has changed - propogate it */
338                 TDB_DATA data;
339                 data.dptr = (uint8_t *)&ctdb_db->db_id;
340                 data.dsize = sizeof(uint32_t);
341                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
342                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
343                                          data, NULL, NULL);             
344         }
345         ctdb_db->seqnum = new_seqnum;
346
347         /* setup a new timer */
348         ctdb_db->te = 
349                 event_add_timed(ctdb->ev, ctdb_db, 
350                                 timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
351                                 ctdb_ltdb_seqnum_check, ctdb_db);
352 }
353
354 /*
355   enable seqnum handling on this db
356  */
357 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
358 {
359         struct ctdb_db_context *ctdb_db;
360         ctdb_db = find_ctdb_db(ctdb, db_id);
361         if (!ctdb_db) {
362                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
363                 return -1;
364         }
365
366         if (ctdb_db->te == NULL) {
367                 ctdb_db->te = 
368                         event_add_timed(ctdb->ev, ctdb_db, 
369                                         timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
370                                         ctdb_ltdb_seqnum_check, ctdb_db);
371         }
372
373         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
374         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
375         return 0;
376 }
377