merge from tridge
[sahlberg/ctdb.git] / common / ctdb_ltdb.c
1 /* 
2    ctdb ltdb code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   find an attached ctdb_db handle given a name
32  */
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
34 {
35         struct ctdb_db_context *tmp_db;
36         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37                 if (strcmp(name, tmp_db->db_name) == 0) {
38                         return tmp_db;
39                 }
40         }
41         return NULL;
42 }
43
44
45 /*
46   this is the dummy null procedure that all databases support
47 */
48 static int ctdb_null_func(struct ctdb_call_info *call)
49 {
50         return 0;
51 }
52
53 /*
54   this is a plain fetch procedure that all databases support
55 */
56 static int ctdb_fetch_func(struct ctdb_call_info *call)
57 {
58         call->reply_data = &call->record_data;
59         return 0;
60 }
61
62
63 /*
64   return the lmaster given a key
65 */
66 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
67 {
68         uint32_t idx, lmaster;
69
70         idx = ctdb_hash(key) % ctdb->vnn_map->size;
71         lmaster = ctdb->vnn_map->map[idx];
72
73         return lmaster;
74 }
75
76
77 /*
78   construct an initial header for a record with no ltdb header yet
79 */
80 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, 
81                                 TDB_DATA key,
82                                 struct ctdb_ltdb_header *header)
83 {
84         header->rsn = 0;
85         /* initial dmaster is the lmaster */
86         header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
87         header->laccessor = header->dmaster;
88         header->lacount = 0;
89 }
90
91
92 /*
93   fetch a record from the ltdb, separating out the header information
94   and returning the body of the record. A valid (initial) header is
95   returned if the record is not present
96 */
97 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, 
98                     TDB_DATA key, struct ctdb_ltdb_header *header, 
99                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
100 {
101         TDB_DATA rec;
102         struct ctdb_context *ctdb = ctdb_db->ctdb;
103
104         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
105         if (rec.dsize < sizeof(*header)) {
106                 TDB_DATA d2;
107                 /* return an initial header */
108                 if (rec.dptr) free(rec.dptr);
109                 ltdb_initial_header(ctdb_db, key, header);
110                 ZERO_STRUCT(d2);
111                 if (data) {
112                         *data = d2;
113                 }
114                 ctdb_ltdb_store(ctdb_db, key, header, d2);
115                 return 0;
116         }
117
118         *header = *(struct ctdb_ltdb_header *)rec.dptr;
119
120         if (data) {
121                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
122                 data->dptr = talloc_memdup(mem_ctx, 
123                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
124                                            data->dsize);
125         }
126
127         free(rec.dptr);
128         if (data) {
129                 CTDB_NO_MEMORY(ctdb, data->dptr);
130         }
131
132         return 0;
133 }
134
135
136 /*
137   fetch a record from the ltdb, separating out the header information
138   and returning the body of the record. A valid (initial) header is
139   returned if the record is not present
140 */
141 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
142                     struct ctdb_ltdb_header *header, TDB_DATA data)
143 {
144         struct ctdb_context *ctdb = ctdb_db->ctdb;
145         TDB_DATA rec;
146         int ret;
147
148         rec.dsize = sizeof(*header) + data.dsize;
149         rec.dptr = talloc_size(ctdb, rec.dsize);
150         CTDB_NO_MEMORY(ctdb, rec.dptr);
151
152         memcpy(rec.dptr, header, sizeof(*header));
153         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
154
155         ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
156         talloc_free(rec.dptr);
157
158         return ret;
159 }
160
161
162 /*
163   lock a record in the ltdb, given a key
164  */
165 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
166 {
167         return tdb_chainlock(ctdb_db->ltdb->tdb, key);
168 }
169
170 /*
171   unlock a record in the ltdb, given a key
172  */
173 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
174 {
175         int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
176         if (ret != 0) {
177                 DEBUG(0,("tdb_chainunlock failed\n"));
178         }
179         return ret;
180 }
181
182 struct lock_fetch_state {
183         struct ctdb_context *ctdb;
184         void (*recv_pkt)(void *, uint8_t *, uint32_t);
185         void *recv_context;
186         struct ctdb_req_header *hdr;
187 };
188
189 /*
190   called when we should retry the operation
191  */
192 static void lock_fetch_callback(void *p)
193 {
194         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
195         state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
196         talloc_free(state);
197         DEBUG(2,(__location__ " PACKET REQUEUED\n"));
198 }
199
200
201 /*
202   do a non-blocking ltdb_lock, deferring this ctdb request until we
203   have the chainlock
204
205   It does the following:
206
207    1) tries to get the chainlock. If it succeeds, then it returns 0
208
209    2) if it fails to get a chainlock immediately then it sets up a
210    non-blocking chainlock via ctdb_lockwait, and when it gets the
211    chainlock it re-submits this ctdb request to the main packet
212    receive function
213
214    This effectively queues all ctdb requests that cannot be
215    immediately satisfied until it can get the lock. This means that
216    the main ctdb daemon will not block waiting for a chainlock held by
217    a client
218
219    There are 3 possible return values:
220
221        0:    means that it got the lock immediately.
222       -1:    means that it failed to get the lock, and won't retry
223       -2:    means that it failed to get the lock immediately, but will retry
224  */
225 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
226                            TDB_DATA key, struct ctdb_req_header *hdr,
227                            void (*recv_pkt)(void *, uint8_t *, uint32_t ),
228                            void *recv_context)
229 {
230         int ret;
231         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
232         struct lockwait_handle *h;
233         struct lock_fetch_state *state;
234         
235         ret = tdb_chainlock_nonblock(tdb, key);
236
237         if (ret != 0 &&
238             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
239                 /* a hard failure - don't try again */
240                 return -1;
241         }
242
243         /* when torturing, ensure we test the contended path */
244         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
245             random() % 5 == 0) {
246                 ret = -1;
247                 tdb_chainunlock(tdb, key);
248         }
249
250         /* first the non-contended path */
251         if (ret == 0) {
252                 return 0;
253         }
254
255         state = talloc(ctdb_db, struct lock_fetch_state);
256         state->ctdb = ctdb_db->ctdb;
257         state->hdr = hdr;
258         state->recv_pkt = recv_pkt;
259         state->recv_context = recv_context;
260
261         /* now the contended path */
262         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
263         if (h == NULL) {
264                 tdb_chainunlock(tdb, key);
265                 return -1;
266         }
267
268         /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
269            so it won't be freed yet */
270         talloc_steal(state, hdr);
271         talloc_steal(state, h);
272
273         /* now tell the caller than we will retry asynchronously */
274         return -2;
275 }
276
277 /*
278   a varient of ctdb_ltdb_lock_requeue that also fetches the record
279  */
280 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
281                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
282                                  struct ctdb_req_header *hdr, TDB_DATA *data,
283                                  void (*recv_pkt)(void *, uint8_t *, uint32_t ),
284                                  void *recv_context)
285 {
286         int ret;
287
288         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context);
289         if (ret == 0) {
290                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
291                 if (ret != 0) {
292                         ctdb_ltdb_unlock(ctdb_db, key);
293                 }
294         }
295         return ret;
296 }
297
298
299 /*
300   a client has asked to attach a new database
301  */
302 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
303                                TDB_DATA *outdata)
304 {
305         const char *db_name = (const char *)indata.dptr;
306         struct ctdb_db_context *ctdb_db, *tmp_db;
307         int ret;
308
309         /* see if we already have this name */
310         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
311                 if (strcmp(db_name, tmp_db->db_name) == 0) {
312                         /* this is not an error */
313                         outdata->dptr  = (uint8_t *)&tmp_db->db_id;
314                         outdata->dsize = sizeof(tmp_db->db_id);
315                         return 0;
316                 }
317         }
318
319         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
320         CTDB_NO_MEMORY(ctdb, ctdb_db);
321
322         ctdb_db->ctdb = ctdb;
323         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
324         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
325
326         ctdb_db->db_id = ctdb_hash(&indata);
327
328         outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
329         outdata->dsize = sizeof(ctdb_db->db_id);
330
331         /* check for hash collisions */
332         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
333                 if (tmp_db->db_id == ctdb_db->db_id) {
334                         DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
335                                  db_name, tmp_db->db_name));
336                         talloc_free(ctdb_db);
337                         return -1;
338                 }
339         }
340
341         if (ctdb->db_directory == NULL) {
342                 ctdb->db_directory = VARDIR "/ctdb";
343         }
344
345         /* make sure the db directory exists */
346         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
347                 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
348                          ctdb->db_directory));
349                 talloc_free(ctdb_db);
350                 return -1;
351         }
352
353         /* open the database */
354         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
355                                            ctdb->db_directory, 
356                                            db_name, ctdb->vnn);
357
358         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 
359                                       TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
360         if (ctdb_db->ltdb == NULL) {
361                 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
362                 talloc_free(ctdb_db);
363                 return -1;
364         }
365         
366         DLIST_ADD(ctdb->db_list, ctdb_db);
367
368         /* 
369            all databases support the "null" function. we need this in
370            order to do forced migration of records
371         */
372         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
373         if (ret != 0) {
374                 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
375                 talloc_free(ctdb_db);
376                 return -1;
377         }
378
379         /* 
380            all databases support the "fetch" function. we need this
381            for efficient Samba3 ctdb fetch
382         */
383         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
384         if (ret != 0) {
385                 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
386                 talloc_free(ctdb_db);
387                 return -1;
388         }
389         
390         /* tell all the other nodes about this database */
391         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNN, 0,
392                                  CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
393                                  indata, NULL, NULL);
394
395         DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
396
397         /* success */
398         return 0;
399 }
400