merge from tridge
[samba.git] / ctdb / common / ctdb_ltdb.c
1 /* 
2    ctdb ltdb code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   find an attached ctdb_db handle given a name
32  */
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
34 {
35         struct ctdb_db_context *tmp_db;
36         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37                 if (strcmp(name, tmp_db->db_name) == 0) {
38                         return tmp_db;
39                 }
40         }
41         return NULL;
42 }
43
44
45 /*
46   this is the dummy null procedure that all databases support
47 */
48 static int ctdb_null_func(struct ctdb_call_info *call)
49 {
50         return 0;
51 }
52
53 /*
54   this is a plain fetch procedure that all databases support
55 */
56 static int ctdb_fetch_func(struct ctdb_call_info *call)
57 {
58         call->reply_data = &call->record_data;
59         return 0;
60 }
61
62
63 /*
64   return the lmaster given a key
65 */
66 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
67 {
68         uint32_t idx, lmaster;
69
70         idx = ctdb_hash(key) % ctdb->vnn_map->size;
71         lmaster = ctdb->vnn_map->map[idx];
72
73         return lmaster;
74 }
75
76
77 /*
78   construct an initial header for a record with no ltdb header yet
79 */
80 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, 
81                                 TDB_DATA key,
82                                 struct ctdb_ltdb_header *header)
83 {
84         header->rsn = 0;
85         /* initial dmaster is the lmaster */
86         header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
87         header->laccessor = header->dmaster;
88         header->lacount = 0;
89 }
90
91
92 /*
93   fetch a record from the ltdb, separating out the header information
94   and returning the body of the record. A valid (initial) header is
95   returned if the record is not present
96 */
97 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, 
98                     TDB_DATA key, struct ctdb_ltdb_header *header, 
99                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
100 {
101         TDB_DATA rec;
102         struct ctdb_context *ctdb = ctdb_db->ctdb;
103
104         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
105         if (rec.dsize < sizeof(*header)) {
106                 TDB_DATA d2;
107                 /* return an initial header */
108                 if (rec.dptr) free(rec.dptr);
109                 ltdb_initial_header(ctdb_db, key, header);
110                 ZERO_STRUCT(d2);
111                 if (data) {
112                         *data = d2;
113                 }
114                 ctdb_ltdb_store(ctdb_db, key, header, d2);
115                 return 0;
116         }
117
118         *header = *(struct ctdb_ltdb_header *)rec.dptr;
119
120         if (data) {
121                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
122                 data->dptr = talloc_memdup(mem_ctx, 
123                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
124                                            data->dsize);
125         }
126
127         free(rec.dptr);
128         if (data) {
129                 CTDB_NO_MEMORY(ctdb, data->dptr);
130         }
131
132         return 0;
133 }
134
135
136 /*
137   fetch a record from the ltdb, separating out the header information
138   and returning the body of the record. A valid (initial) header is
139   returned if the record is not present
140 */
141 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
142                     struct ctdb_ltdb_header *header, TDB_DATA data)
143 {
144         struct ctdb_context *ctdb = ctdb_db->ctdb;
145         TDB_DATA rec;
146         int ret;
147
148         if (ctdb->flags & CTDB_FLAG_TORTURE) {
149                 struct ctdb_ltdb_header *h2;
150                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
151                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
152                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
153                         DEBUG(0,("RSN regression! %llu %llu\n",
154                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
155                 }
156                 if (rec.dptr) free(rec.dptr);
157         }
158
159         rec.dsize = sizeof(*header) + data.dsize;
160         rec.dptr = talloc_size(ctdb, rec.dsize);
161         CTDB_NO_MEMORY(ctdb, rec.dptr);
162
163         memcpy(rec.dptr, header, sizeof(*header));
164         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
165
166         ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
167         talloc_free(rec.dptr);
168
169         return ret;
170 }
171
172
173 /*
174   lock a record in the ltdb, given a key
175  */
176 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
177 {
178         return tdb_chainlock(ctdb_db->ltdb->tdb, key);
179 }
180
181 /*
182   unlock a record in the ltdb, given a key
183  */
184 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
185 {
186         int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
187         if (ret != 0) {
188                 DEBUG(0,("tdb_chainunlock failed\n"));
189         }
190         return ret;
191 }
192
193 struct lock_fetch_state {
194         struct ctdb_context *ctdb;
195         void (*recv_pkt)(void *, uint8_t *, uint32_t);
196         void *recv_context;
197         struct ctdb_req_header *hdr;
198 };
199
200 /*
201   called when we should retry the operation
202  */
203 static void lock_fetch_callback(void *p)
204 {
205         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
206         state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
207         DEBUG(2,(__location__ " PACKET REQUEUED\n"));
208 }
209
210
211 /*
212   do a non-blocking ltdb_lock, deferring this ctdb request until we
213   have the chainlock
214
215   It does the following:
216
217    1) tries to get the chainlock. If it succeeds, then it returns 0
218
219    2) if it fails to get a chainlock immediately then it sets up a
220    non-blocking chainlock via ctdb_lockwait, and when it gets the
221    chainlock it re-submits this ctdb request to the main packet
222    receive function
223
224    This effectively queues all ctdb requests that cannot be
225    immediately satisfied until it can get the lock. This means that
226    the main ctdb daemon will not block waiting for a chainlock held by
227    a client
228
229    There are 3 possible return values:
230
231        0:    means that it got the lock immediately.
232       -1:    means that it failed to get the lock, and won't retry
233       -2:    means that it failed to get the lock immediately, but will retry
234  */
235 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
236                            TDB_DATA key, struct ctdb_req_header *hdr,
237                            void (*recv_pkt)(void *, uint8_t *, uint32_t ),
238                            void *recv_context)
239 {
240         int ret;
241         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
242         struct lockwait_handle *h;
243         struct lock_fetch_state *state;
244         
245         ret = tdb_chainlock_nonblock(tdb, key);
246
247         if (ret != 0 &&
248             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
249                 /* a hard failure - don't try again */
250                 return -1;
251         }
252
253         /* when torturing, ensure we test the contended path */
254         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
255             random() % 5 == 0) {
256                 ret = -1;
257                 tdb_chainunlock(tdb, key);
258         }
259
260         /* first the non-contended path */
261         if (ret == 0) {
262                 return 0;
263         }
264
265         state = talloc(hdr, struct lock_fetch_state);
266         state->ctdb = ctdb_db->ctdb;
267         state->hdr = hdr;
268         state->recv_pkt = recv_pkt;
269         state->recv_context = recv_context;
270
271         /* now the contended path */
272         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
273         if (h == NULL) {
274                 tdb_chainunlock(tdb, key);
275                 return -1;
276         }
277
278         /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
279            so it won't be freed yet */
280         talloc_steal(state, hdr);
281         talloc_steal(state, h);
282
283         /* now tell the caller than we will retry asynchronously */
284         return -2;
285 }
286
287 /*
288   a varient of ctdb_ltdb_lock_requeue that also fetches the record
289  */
290 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
291                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
292                                  struct ctdb_req_header *hdr, TDB_DATA *data,
293                                  void (*recv_pkt)(void *, uint8_t *, uint32_t ),
294                                  void *recv_context)
295 {
296         int ret;
297
298         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context);
299         if (ret == 0) {
300                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
301                 if (ret != 0) {
302                         ctdb_ltdb_unlock(ctdb_db, key);
303                 }
304         }
305         return ret;
306 }
307
308
309 /*
310   a client has asked to attach a new database
311  */
312 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
313                                TDB_DATA *outdata)
314 {
315         const char *db_name = (const char *)indata.dptr;
316         struct ctdb_db_context *ctdb_db, *tmp_db;
317         int ret;
318
319         /* see if we already have this name */
320         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
321                 if (strcmp(db_name, tmp_db->db_name) == 0) {
322                         /* this is not an error */
323                         outdata->dptr  = (uint8_t *)&tmp_db->db_id;
324                         outdata->dsize = sizeof(tmp_db->db_id);
325                         return 0;
326                 }
327         }
328
329         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
330         CTDB_NO_MEMORY(ctdb, ctdb_db);
331
332         ctdb_db->ctdb = ctdb;
333         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
334         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
335
336         ctdb_db->db_id = ctdb_hash(&indata);
337
338         outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
339         outdata->dsize = sizeof(ctdb_db->db_id);
340
341         /* check for hash collisions */
342         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
343                 if (tmp_db->db_id == ctdb_db->db_id) {
344                         DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
345                                  db_name, tmp_db->db_name));
346                         talloc_free(ctdb_db);
347                         return -1;
348                 }
349         }
350
351         if (ctdb->db_directory == NULL) {
352                 ctdb->db_directory = VARDIR "/ctdb";
353         }
354
355         /* make sure the db directory exists */
356         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
357                 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
358                          ctdb->db_directory));
359                 talloc_free(ctdb_db);
360                 return -1;
361         }
362
363         /* open the database */
364         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
365                                            ctdb->db_directory, 
366                                            db_name, ctdb->vnn);
367
368         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 
369                                       TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
370         if (ctdb_db->ltdb == NULL) {
371                 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
372                 talloc_free(ctdb_db);
373                 return -1;
374         }
375
376         DLIST_ADD(ctdb->db_list, ctdb_db);
377
378         /* 
379            all databases support the "null" function. we need this in
380            order to do forced migration of records
381         */
382         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
383         if (ret != 0) {
384                 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
385                 talloc_free(ctdb_db);
386                 return -1;
387         }
388
389         /* 
390            all databases support the "fetch" function. we need this
391            for efficient Samba3 ctdb fetch
392         */
393         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
394         if (ret != 0) {
395                 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
396                 talloc_free(ctdb_db);
397                 return -1;
398         }
399         
400         /* tell all the other nodes about this database */
401         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
402                                  CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
403                                  indata, NULL, NULL);
404
405         DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
406
407         /* success */
408         return 0;
409 }
410
411 /*
412   called when a broadcast seqnum update comes in
413  */
414 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
415 {
416         struct ctdb_db_context *ctdb_db;
417         if (srcnode == ctdb->vnn) {
418                 /* don't update ourselves! */
419                 return 0;
420         }
421
422         ctdb_db = find_ctdb_db(ctdb, db_id);
423         if (!ctdb_db) {
424                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n"));
425                 return -1;
426         }
427
428         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
429         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
430         return 0;
431 }
432
433 /*
434   timer to check for seqnum changes in a ltdb and propogate them
435  */
436 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
437                                    struct timeval t, void *p)
438 {
439         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
440         struct ctdb_context *ctdb = ctdb_db->ctdb;
441         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
442         if (new_seqnum != ctdb_db->seqnum) {
443                 /* something has changed - propogate it */
444                 TDB_DATA data;
445                 data.dptr = (uint8_t *)&ctdb_db->db_id;
446                 data.dsize = sizeof(uint32_t);
447                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
448                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
449                                          data, NULL, NULL);             
450         }
451         ctdb_db->seqnum = new_seqnum;
452
453         /* setup a new timer */
454         ctdb_db->te = event_add_timed(ctdb->ev, ctdb_db, 
455                                       timeval_current_ofs(ctdb->seqnum_frequency, 0),
456                                       ctdb_ltdb_seqnum_check, ctdb_db);
457 }
458
459 /*
460   enable seqnum handling on this db
461  */
462 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
463 {
464         struct ctdb_db_context *ctdb_db;
465         ctdb_db = find_ctdb_db(ctdb, db_id);
466         if (!ctdb_db) {
467                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n"));
468                 return -1;
469         }
470
471         if (ctdb_db->te == NULL) {
472                 ctdb_db->te = event_add_timed(ctdb->ev, ctdb_db, 
473                                               timeval_current_ofs(ctdb->seqnum_frequency, 0),
474                                               ctdb_ltdb_seqnum_check, ctdb_db);
475         }
476
477         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
478         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
479         return 0;
480 }
481
482 /*
483   enable seqnum handling on this db
484  */
485 int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency)
486 {
487         ctdb->seqnum_frequency = frequency;
488         return 0;
489 }