merge with tridges tree to resolve all conflicts
[sahlberg/ctdb.git] / common / ctdb_ltdb.c
1 /* 
2    ctdb ltdb code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   find an attached ctdb_db handle given a name
32  */
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
34 {
35         struct ctdb_db_context *tmp_db;
36         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37                 if (strcmp(name, tmp_db->db_name) == 0) {
38                         return tmp_db;
39                 }
40         }
41         return NULL;
42 }
43
44
45 /*
46   this is the dummy null procedure that all databases support
47 */
48 static int ctdb_null_func(struct ctdb_call_info *call)
49 {
50         return 0;
51 }
52
53
54 /*
55   return the lmaster given a key
56 */
57 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
58 {
59         uint32_t idx, lmaster;
60
61         idx = ctdb_hash(key) % ctdb->vnn_map->size;
62         lmaster = ctdb->vnn_map->map[idx];
63
64         return lmaster;
65 }
66
67
68 /*
69   construct an initial header for a record with no ltdb header yet
70 */
71 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, 
72                                 TDB_DATA key,
73                                 struct ctdb_ltdb_header *header)
74 {
75         header->rsn = 0;
76         /* initial dmaster is the lmaster */
77         header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
78         header->laccessor = header->dmaster;
79         header->lacount = 0;
80 }
81
82
83 /*
84   fetch a record from the ltdb, separating out the header information
85   and returning the body of the record. A valid (initial) header is
86   returned if the record is not present
87 */
88 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, 
89                     TDB_DATA key, struct ctdb_ltdb_header *header, 
90                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
91 {
92         TDB_DATA rec;
93         struct ctdb_context *ctdb = ctdb_db->ctdb;
94
95         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
96         if (rec.dsize < sizeof(*header)) {
97                 TDB_DATA d2;
98                 /* return an initial header */
99                 if (rec.dptr) free(rec.dptr);
100                 ltdb_initial_header(ctdb_db, key, header);
101                 ZERO_STRUCT(d2);
102                 if (data) {
103                         *data = d2;
104                 }
105                 ctdb_ltdb_store(ctdb_db, key, header, d2);
106                 return 0;
107         }
108
109         *header = *(struct ctdb_ltdb_header *)rec.dptr;
110
111         if (data) {
112                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
113                 data->dptr = talloc_memdup(mem_ctx, 
114                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
115                                            data->dsize);
116         }
117
118         free(rec.dptr);
119         if (data) {
120                 CTDB_NO_MEMORY(ctdb, data->dptr);
121         }
122
123         return 0;
124 }
125
126
127 /*
128   fetch a record from the ltdb, separating out the header information
129   and returning the body of the record. A valid (initial) header is
130   returned if the record is not present
131 */
132 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
133                     struct ctdb_ltdb_header *header, TDB_DATA data)
134 {
135         struct ctdb_context *ctdb = ctdb_db->ctdb;
136         TDB_DATA rec;
137         int ret;
138
139         rec.dsize = sizeof(*header) + data.dsize;
140         rec.dptr = talloc_size(ctdb, rec.dsize);
141         CTDB_NO_MEMORY(ctdb, rec.dptr);
142
143         memcpy(rec.dptr, header, sizeof(*header));
144         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
145
146         ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
147         talloc_free(rec.dptr);
148
149         return ret;
150 }
151
152
153 /*
154   lock a record in the ltdb, given a key
155  */
156 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
157 {
158         return tdb_chainlock(ctdb_db->ltdb->tdb, key);
159 }
160
161 /*
162   unlock a record in the ltdb, given a key
163  */
164 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
165 {
166         int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
167         if (ret != 0) {
168                 DEBUG(0,("tdb_chainunlock failed\n"));
169         }
170         return ret;
171 }
172
173 struct lock_fetch_state {
174         struct ctdb_context *ctdb;
175         void (*recv_pkt)(void *, uint8_t *, uint32_t);
176         void *recv_context;
177         struct ctdb_req_header *hdr;
178 };
179
180 /*
181   called when we should retry the operation
182  */
183 static void lock_fetch_callback(void *p)
184 {
185         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
186         state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
187         talloc_free(state);
188         DEBUG(2,(__location__ " PACKET REQUEUED\n"));
189 }
190
191
192 /*
193   do a non-blocking ltdb_lock, deferring this ctdb request until we
194   have the chainlock
195
196   It does the following:
197
198    1) tries to get the chainlock. If it succeeds, then it returns 0
199
200    2) if it fails to get a chainlock immediately then it sets up a
201    non-blocking chainlock via ctdb_lockwait, and when it gets the
202    chainlock it re-submits this ctdb request to the main packet
203    receive function
204
205    This effectively queues all ctdb requests that cannot be
206    immediately satisfied until it can get the lock. This means that
207    the main ctdb daemon will not block waiting for a chainlock held by
208    a client
209
210    There are 3 possible return values:
211
212        0:    means that it got the lock immediately.
213       -1:    means that it failed to get the lock, and won't retry
214       -2:    means that it failed to get the lock immediately, but will retry
215  */
216 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
217                            TDB_DATA key, struct ctdb_req_header *hdr,
218                            void (*recv_pkt)(void *, uint8_t *, uint32_t ),
219                            void *recv_context)
220 {
221         int ret;
222         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
223         struct lockwait_handle *h;
224         struct lock_fetch_state *state;
225         
226         ret = tdb_chainlock_nonblock(tdb, key);
227
228         if (ret != 0 &&
229             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
230                 /* a hard failure - don't try again */
231                 return -1;
232         }
233
234         /* when torturing, ensure we test the contended path */
235         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
236             random() % 5 == 0) {
237                 ret = -1;
238                 tdb_chainunlock(tdb, key);
239         }
240
241         /* first the non-contended path */
242         if (ret == 0) {
243                 return 0;
244         }
245
246         state = talloc(ctdb_db, struct lock_fetch_state);
247         state->ctdb = ctdb_db->ctdb;
248         state->hdr = hdr;
249         state->recv_pkt = recv_pkt;
250         state->recv_context = recv_context;
251
252         /* now the contended path */
253         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
254         if (h == NULL) {
255                 tdb_chainunlock(tdb, key);
256                 return -1;
257         }
258
259         /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
260            so it won't be freed yet */
261         talloc_steal(state, hdr);
262         talloc_steal(state, h);
263
264         /* now tell the caller than we will retry asynchronously */
265         return -2;
266 }
267
268 /*
269   a varient of ctdb_ltdb_lock_requeue that also fetches the record
270  */
271 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
272                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
273                                  struct ctdb_req_header *hdr, TDB_DATA *data,
274                                  void (*recv_pkt)(void *, uint8_t *, uint32_t ),
275                                  void *recv_context)
276 {
277         int ret;
278
279         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context);
280         if (ret == 0) {
281                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
282                 if (ret != 0) {
283                         ctdb_ltdb_unlock(ctdb_db, key);
284                 }
285         }
286         return ret;
287 }
288
289
290 /*
291   a client has asked to attach a new database
292  */
293 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
294                                TDB_DATA *outdata)
295 {
296         const char *db_name = (const char *)indata.dptr;
297         struct ctdb_db_context *ctdb_db, *tmp_db;
298         int ret;
299
300         /* see if we already have this name */
301         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
302                 if (strcmp(db_name, tmp_db->db_name) == 0) {
303                         /* this is not an error */
304                         outdata->dptr  = (uint8_t *)&tmp_db->db_id;
305                         outdata->dsize = sizeof(tmp_db->db_id);
306                         return 0;
307                 }
308         }
309
310         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
311         CTDB_NO_MEMORY(ctdb, ctdb_db);
312
313         ctdb_db->ctdb = ctdb;
314         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
315         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
316
317         ctdb_db->db_id = ctdb_hash(&indata);
318
319         outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
320         outdata->dsize = sizeof(ctdb_db->db_id);
321
322         /* check for hash collisions */
323         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
324                 if (tmp_db->db_id == ctdb_db->db_id) {
325                         DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
326                                  db_name, tmp_db->db_name));
327                         talloc_free(ctdb_db);
328                         return -1;
329                 }
330         }
331
332         if (ctdb->db_directory == NULL) {
333                 ctdb->db_directory = VARDIR "/ctdb";
334         }
335
336         /* make sure the db directory exists */
337         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
338                 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
339                          ctdb->db_directory));
340                 talloc_free(ctdb_db);
341                 return -1;
342         }
343
344         /* open the database */
345         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
346                                            ctdb->db_directory, 
347                                            db_name, ctdb->vnn);
348
349         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 
350                                       TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
351         if (ctdb_db->ltdb == NULL) {
352                 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
353                 talloc_free(ctdb_db);
354                 return -1;
355         }
356         
357         DLIST_ADD(ctdb->db_list, ctdb_db);
358
359         /* 
360            all databases support the "null" function. we need this in
361            order to do forced migration of records
362         */
363         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
364         if (ret != 0) {
365                 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
366                 talloc_free(ctdb_db);
367                 return -1;
368         }
369         
370         /* tell all the other nodes about this database */
371         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNN, 0,
372                                  CTDB_CONTROL_DB_ATTACH, CTDB_CTRL_FLAG_NOREPLY,
373                                  indata, NULL, NULL);
374
375         DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
376
377         /* success */
378         return 0;
379 }
380