- merged ctdb_store test from ronnie
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 2 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   this is the dummy null procedure that all databases support
32 */
33 static int ctdb_null_func(struct ctdb_call_info *call)
34 {
35         return 0;
36 }
37
38 /*
39   this is a plain fetch procedure that all databases support
40 */
41 static int ctdb_fetch_func(struct ctdb_call_info *call)
42 {
43         call->reply_data = &call->record_data;
44         return 0;
45 }
46
47
48
49 struct lock_fetch_state {
50         struct ctdb_context *ctdb;
51         void (*recv_pkt)(void *, struct ctdb_req_header *);
52         void *recv_context;
53         struct ctdb_req_header *hdr;
54         uint32_t generation;
55         bool ignore_generation;
56 };
57
58 /*
59   called when we should retry the operation
60  */
61 static void lock_fetch_callback(void *p)
62 {
63         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
64         if (!state->ignore_generation &&
65             state->generation != state->ctdb->vnn_map->generation) {
66                 DEBUG(0,("Discarding previous generation lockwait packet\n"));
67                 talloc_free(state->hdr);
68                 return;
69         }
70         state->recv_pkt(state->recv_context, state->hdr);
71         DEBUG(2,(__location__ " PACKET REQUEUED\n"));
72 }
73
74
75 /*
76   do a non-blocking ltdb_lock, deferring this ctdb request until we
77   have the chainlock
78
79   It does the following:
80
81    1) tries to get the chainlock. If it succeeds, then it returns 0
82
83    2) if it fails to get a chainlock immediately then it sets up a
84    non-blocking chainlock via ctdb_lockwait, and when it gets the
85    chainlock it re-submits this ctdb request to the main packet
86    receive function
87
88    This effectively queues all ctdb requests that cannot be
89    immediately satisfied until it can get the lock. This means that
90    the main ctdb daemon will not block waiting for a chainlock held by
91    a client
92
93    There are 3 possible return values:
94
95        0:    means that it got the lock immediately.
96       -1:    means that it failed to get the lock, and won't retry
97       -2:    means that it failed to get the lock immediately, but will retry
98  */
99 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
100                            TDB_DATA key, struct ctdb_req_header *hdr,
101                            void (*recv_pkt)(void *, struct ctdb_req_header *),
102                            void *recv_context, bool ignore_generation)
103 {
104         int ret;
105         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
106         struct lockwait_handle *h;
107         struct lock_fetch_state *state;
108         
109         ret = tdb_chainlock_nonblock(tdb, key);
110
111         if (ret != 0 &&
112             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
113                 /* a hard failure - don't try again */
114                 return -1;
115         }
116
117         /* when torturing, ensure we test the contended path */
118         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
119             random() % 5 == 0) {
120                 ret = -1;
121                 tdb_chainunlock(tdb, key);
122         }
123
124         /* first the non-contended path */
125         if (ret == 0) {
126                 return 0;
127         }
128
129         state = talloc(hdr, struct lock_fetch_state);
130         state->ctdb = ctdb_db->ctdb;
131         state->hdr = hdr;
132         state->recv_pkt = recv_pkt;
133         state->recv_context = recv_context;
134         state->generation = ctdb_db->ctdb->vnn_map->generation;
135         state->ignore_generation = ignore_generation;
136
137         /* now the contended path */
138         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
139         if (h == NULL) {
140                 tdb_chainunlock(tdb, key);
141                 return -1;
142         }
143
144         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
145            so it won't be freed yet */
146         talloc_steal(state, hdr);
147         talloc_steal(state, h);
148
149         /* now tell the caller than we will retry asynchronously */
150         return -2;
151 }
152
153 /*
154   a varient of ctdb_ltdb_lock_requeue that also fetches the record
155  */
156 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
157                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
158                                  struct ctdb_req_header *hdr, TDB_DATA *data,
159                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
160                                  void *recv_context, bool ignore_generation)
161 {
162         int ret;
163
164         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
165                                      recv_context, ignore_generation);
166         if (ret == 0) {
167                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
168                 if (ret != 0) {
169                         ctdb_ltdb_unlock(ctdb_db, key);
170                 }
171         }
172         return ret;
173 }
174
175
176 /*
177   paraoid check to see if the db is empty
178  */
179 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
180 {
181         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
182         int count = tdb_traverse_read(tdb, NULL, NULL);
183         if (count != 0) {
184                 DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
185                          ctdb_db->db_path));
186                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
187         }
188 }
189
190 /*
191   a client has asked to attach a new database
192  */
193 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
194                                TDB_DATA *outdata)
195 {
196         const char *db_name = (const char *)indata.dptr;
197         struct ctdb_db_context *ctdb_db, *tmp_db;
198         int ret;
199
200         /* see if we already have this name */
201         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
202                 if (strcmp(db_name, tmp_db->db_name) == 0) {
203                         /* this is not an error */
204                         outdata->dptr  = (uint8_t *)&tmp_db->db_id;
205                         outdata->dsize = sizeof(tmp_db->db_id);
206                         return 0;
207                 }
208         }
209
210         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
211         CTDB_NO_MEMORY(ctdb, ctdb_db);
212
213         ctdb_db->ctdb = ctdb;
214         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
215         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
216
217         ctdb_db->db_id = ctdb_hash(&indata);
218
219         outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
220         outdata->dsize = sizeof(ctdb_db->db_id);
221
222         /* check for hash collisions */
223         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
224                 if (tmp_db->db_id == ctdb_db->db_id) {
225                         DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
226                                  tmp_db->db_id, db_name, tmp_db->db_name));
227                         talloc_free(ctdb_db);
228                         return -1;
229                 }
230         }
231
232         if (ctdb->db_directory == NULL) {
233                 ctdb->db_directory = VARDIR "/ctdb";
234         }
235
236         /* make sure the db directory exists */
237         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
238                 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
239                          ctdb->db_directory));
240                 talloc_free(ctdb_db);
241                 return -1;
242         }
243
244         /* open the database */
245         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
246                                            ctdb->db_directory, 
247                                            db_name, ctdb->vnn);
248
249         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
250                                       ctdb->tunable.database_hash_size, 
251                                       TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
252         if (ctdb_db->ltdb == NULL) {
253                 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
254                 talloc_free(ctdb_db);
255                 return -1;
256         }
257
258         ctdb_check_db_empty(ctdb_db);
259
260         DLIST_ADD(ctdb->db_list, ctdb_db);
261
262         /* 
263            all databases support the "null" function. we need this in
264            order to do forced migration of records
265         */
266         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
267         if (ret != 0) {
268                 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
269                 talloc_free(ctdb_db);
270                 return -1;
271         }
272
273         /* 
274            all databases support the "fetch" function. we need this
275            for efficient Samba3 ctdb fetch
276         */
277         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
278         if (ret != 0) {
279                 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
280                 talloc_free(ctdb_db);
281                 return -1;
282         }
283         
284         /* tell all the other nodes about this database */
285         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
286                                  CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
287                                  indata, NULL, NULL);
288
289         DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
290
291         /* success */
292         return 0;
293 }
294
295 /*
296   called when a broadcast seqnum update comes in
297  */
298 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
299 {
300         struct ctdb_db_context *ctdb_db;
301         if (srcnode == ctdb->vnn) {
302                 /* don't update ourselves! */
303                 return 0;
304         }
305
306         ctdb_db = find_ctdb_db(ctdb, db_id);
307         if (!ctdb_db) {
308                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
309                 return -1;
310         }
311
312         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
313         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
314         return 0;
315 }
316
317 /*
318   timer to check for seqnum changes in a ltdb and propogate them
319  */
320 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
321                                    struct timeval t, void *p)
322 {
323         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
324         struct ctdb_context *ctdb = ctdb_db->ctdb;
325         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
326         if (new_seqnum != ctdb_db->seqnum) {
327                 /* something has changed - propogate it */
328                 TDB_DATA data;
329                 data.dptr = (uint8_t *)&ctdb_db->db_id;
330                 data.dsize = sizeof(uint32_t);
331                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
332                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
333                                          data, NULL, NULL);             
334         }
335         ctdb_db->seqnum = new_seqnum;
336
337         /* setup a new timer */
338         ctdb_db->te = 
339                 event_add_timed(ctdb->ev, ctdb_db, 
340                                 timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
341                                 ctdb_ltdb_seqnum_check, ctdb_db);
342 }
343
344 /*
345   enable seqnum handling on this db
346  */
347 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
348 {
349         struct ctdb_db_context *ctdb_db;
350         ctdb_db = find_ctdb_db(ctdb, db_id);
351         if (!ctdb_db) {
352                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
353                 return -1;
354         }
355
356         if (ctdb_db->te == NULL) {
357                 ctdb_db->te = 
358                         event_add_timed(ctdb->ev, ctdb_db, 
359                                         timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
360                                         ctdb_ltdb_seqnum_check, ctdb_db);
361         }
362
363         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
364         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
365         return 0;
366 }
367