paraoid check for empty db on attach
[samba.git] / ctdb / common / ctdb_ltdb.c
1 /* 
2    ctdb ltdb code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   find an attached ctdb_db handle given a name
32  */
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
34 {
35         struct ctdb_db_context *tmp_db;
36         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37                 if (strcmp(name, tmp_db->db_name) == 0) {
38                         return tmp_db;
39                 }
40         }
41         return NULL;
42 }
43
44
45 /*
46   this is the dummy null procedure that all databases support
47 */
48 static int ctdb_null_func(struct ctdb_call_info *call)
49 {
50         return 0;
51 }
52
53 /*
54   this is a plain fetch procedure that all databases support
55 */
56 static int ctdb_fetch_func(struct ctdb_call_info *call)
57 {
58         call->reply_data = &call->record_data;
59         return 0;
60 }
61
62
63 /*
64   return the lmaster given a key
65 */
66 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
67 {
68         uint32_t idx, lmaster;
69
70         idx = ctdb_hash(key) % ctdb->vnn_map->size;
71         lmaster = ctdb->vnn_map->map[idx];
72
73         return lmaster;
74 }
75
76
77 /*
78   construct an initial header for a record with no ltdb header yet
79 */
80 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, 
81                                 TDB_DATA key,
82                                 struct ctdb_ltdb_header *header)
83 {
84         header->rsn = 0;
85         /* initial dmaster is the lmaster */
86         header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
87         header->laccessor = header->dmaster;
88         header->lacount = 0;
89 }
90
91
92 /*
93   fetch a record from the ltdb, separating out the header information
94   and returning the body of the record. A valid (initial) header is
95   returned if the record is not present
96 */
97 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, 
98                     TDB_DATA key, struct ctdb_ltdb_header *header, 
99                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
100 {
101         TDB_DATA rec;
102         struct ctdb_context *ctdb = ctdb_db->ctdb;
103
104         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
105         if (rec.dsize < sizeof(*header)) {
106                 TDB_DATA d2;
107                 /* return an initial header */
108                 if (rec.dptr) free(rec.dptr);
109                 ltdb_initial_header(ctdb_db, key, header);
110                 ZERO_STRUCT(d2);
111                 if (data) {
112                         *data = d2;
113                 }
114                 ctdb_ltdb_store(ctdb_db, key, header, d2);
115                 return 0;
116         }
117
118         *header = *(struct ctdb_ltdb_header *)rec.dptr;
119
120         if (data) {
121                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
122                 data->dptr = talloc_memdup(mem_ctx, 
123                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
124                                            data->dsize);
125         }
126
127         free(rec.dptr);
128         if (data) {
129                 CTDB_NO_MEMORY(ctdb, data->dptr);
130         }
131
132         return 0;
133 }
134
135
136 /*
137   fetch a record from the ltdb, separating out the header information
138   and returning the body of the record. A valid (initial) header is
139   returned if the record is not present
140 */
141 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
142                     struct ctdb_ltdb_header *header, TDB_DATA data)
143 {
144         struct ctdb_context *ctdb = ctdb_db->ctdb;
145         TDB_DATA rec;
146         int ret;
147
148         if (ctdb->flags & CTDB_FLAG_TORTURE) {
149                 struct ctdb_ltdb_header *h2;
150                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
151                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
152                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
153                         DEBUG(0,("RSN regression! %llu %llu\n",
154                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
155                 }
156                 if (rec.dptr) free(rec.dptr);
157         }
158
159         rec.dsize = sizeof(*header) + data.dsize;
160         rec.dptr = talloc_size(ctdb, rec.dsize);
161         CTDB_NO_MEMORY(ctdb, rec.dptr);
162
163         memcpy(rec.dptr, header, sizeof(*header));
164         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
165
166         ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
167         talloc_free(rec.dptr);
168
169         return ret;
170 }
171
172
173 /*
174   lock a record in the ltdb, given a key
175  */
176 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
177 {
178         return tdb_chainlock(ctdb_db->ltdb->tdb, key);
179 }
180
181 /*
182   unlock a record in the ltdb, given a key
183  */
184 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
185 {
186         int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
187         if (ret != 0) {
188                 DEBUG(0,("tdb_chainunlock failed\n"));
189         }
190         return ret;
191 }
192
193 struct lock_fetch_state {
194         struct ctdb_context *ctdb;
195         void (*recv_pkt)(void *, struct ctdb_req_header *);
196         void *recv_context;
197         struct ctdb_req_header *hdr;
198         uint32_t generation;
199         bool ignore_generation;
200 };
201
202 /*
203   called when we should retry the operation
204  */
205 static void lock_fetch_callback(void *p)
206 {
207         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
208         if (!state->ignore_generation &&
209             state->generation != state->ctdb->vnn_map->generation) {
210                 DEBUG(0,("Discarding previous generation lockwait packet\n"));
211                 talloc_free(state->hdr);
212                 return;
213         }
214         state->recv_pkt(state->recv_context, state->hdr);
215         DEBUG(2,(__location__ " PACKET REQUEUED\n"));
216 }
217
218
219 /*
220   do a non-blocking ltdb_lock, deferring this ctdb request until we
221   have the chainlock
222
223   It does the following:
224
225    1) tries to get the chainlock. If it succeeds, then it returns 0
226
227    2) if it fails to get a chainlock immediately then it sets up a
228    non-blocking chainlock via ctdb_lockwait, and when it gets the
229    chainlock it re-submits this ctdb request to the main packet
230    receive function
231
232    This effectively queues all ctdb requests that cannot be
233    immediately satisfied until it can get the lock. This means that
234    the main ctdb daemon will not block waiting for a chainlock held by
235    a client
236
237    There are 3 possible return values:
238
239        0:    means that it got the lock immediately.
240       -1:    means that it failed to get the lock, and won't retry
241       -2:    means that it failed to get the lock immediately, but will retry
242  */
243 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
244                            TDB_DATA key, struct ctdb_req_header *hdr,
245                            void (*recv_pkt)(void *, struct ctdb_req_header *),
246                            void *recv_context, bool ignore_generation)
247 {
248         int ret;
249         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
250         struct lockwait_handle *h;
251         struct lock_fetch_state *state;
252         
253         ret = tdb_chainlock_nonblock(tdb, key);
254
255         if (ret != 0 &&
256             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
257                 /* a hard failure - don't try again */
258                 return -1;
259         }
260
261         /* when torturing, ensure we test the contended path */
262         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
263             random() % 5 == 0) {
264                 ret = -1;
265                 tdb_chainunlock(tdb, key);
266         }
267
268         /* first the non-contended path */
269         if (ret == 0) {
270                 return 0;
271         }
272
273         state = talloc(hdr, struct lock_fetch_state);
274         state->ctdb = ctdb_db->ctdb;
275         state->hdr = hdr;
276         state->recv_pkt = recv_pkt;
277         state->recv_context = recv_context;
278         state->generation = ctdb_db->ctdb->vnn_map->generation;
279         state->ignore_generation = ignore_generation;
280
281         /* now the contended path */
282         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
283         if (h == NULL) {
284                 tdb_chainunlock(tdb, key);
285                 return -1;
286         }
287
288         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
289            so it won't be freed yet */
290         talloc_steal(state, hdr);
291         talloc_steal(state, h);
292
293         /* now tell the caller than we will retry asynchronously */
294         return -2;
295 }
296
297 /*
298   a varient of ctdb_ltdb_lock_requeue that also fetches the record
299  */
300 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
301                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
302                                  struct ctdb_req_header *hdr, TDB_DATA *data,
303                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
304                                  void *recv_context, bool ignore_generation)
305 {
306         int ret;
307
308         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
309                                      recv_context, ignore_generation);
310         if (ret == 0) {
311                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
312                 if (ret != 0) {
313                         ctdb_ltdb_unlock(ctdb_db, key);
314                 }
315         }
316         return ret;
317 }
318
319
320 /*
321   paraoid check to see if the db is empty
322  */
323 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
324 {
325         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
326         int count = tdb_traverse_read(tdb, NULL, NULL);
327         if (count != 0) {
328                 DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
329                          ctdb_db->db_path));
330                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
331         }
332 }
333
334 /*
335   a client has asked to attach a new database
336  */
337 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
338                                TDB_DATA *outdata)
339 {
340         const char *db_name = (const char *)indata.dptr;
341         struct ctdb_db_context *ctdb_db, *tmp_db;
342         int ret;
343
344         /* see if we already have this name */
345         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
346                 if (strcmp(db_name, tmp_db->db_name) == 0) {
347                         /* this is not an error */
348                         outdata->dptr  = (uint8_t *)&tmp_db->db_id;
349                         outdata->dsize = sizeof(tmp_db->db_id);
350                         return 0;
351                 }
352         }
353
354         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
355         CTDB_NO_MEMORY(ctdb, ctdb_db);
356
357         ctdb_db->ctdb = ctdb;
358         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
359         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
360
361         ctdb_db->db_id = ctdb_hash(&indata);
362
363         outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
364         outdata->dsize = sizeof(ctdb_db->db_id);
365
366         /* check for hash collisions */
367         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
368                 if (tmp_db->db_id == ctdb_db->db_id) {
369                         DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
370                                  db_name, tmp_db->db_name));
371                         talloc_free(ctdb_db);
372                         return -1;
373                 }
374         }
375
376         if (ctdb->db_directory == NULL) {
377                 ctdb->db_directory = VARDIR "/ctdb";
378         }
379
380         /* make sure the db directory exists */
381         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
382                 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
383                          ctdb->db_directory));
384                 talloc_free(ctdb_db);
385                 return -1;
386         }
387
388         /* open the database */
389         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
390                                            ctdb->db_directory, 
391                                            db_name, ctdb->vnn);
392
393         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 
394                                       TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
395         if (ctdb_db->ltdb == NULL) {
396                 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
397                 talloc_free(ctdb_db);
398                 return -1;
399         }
400
401         ctdb_check_db_empty(ctdb_db);
402
403         DLIST_ADD(ctdb->db_list, ctdb_db);
404
405         /* 
406            all databases support the "null" function. we need this in
407            order to do forced migration of records
408         */
409         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
410         if (ret != 0) {
411                 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
412                 talloc_free(ctdb_db);
413                 return -1;
414         }
415
416         /* 
417            all databases support the "fetch" function. we need this
418            for efficient Samba3 ctdb fetch
419         */
420         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
421         if (ret != 0) {
422                 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
423                 talloc_free(ctdb_db);
424                 return -1;
425         }
426         
427         /* tell all the other nodes about this database */
428         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
429                                  CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
430                                  indata, NULL, NULL);
431
432         DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
433
434         /* success */
435         return 0;
436 }
437
438 /*
439   called when a broadcast seqnum update comes in
440  */
441 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
442 {
443         struct ctdb_db_context *ctdb_db;
444         if (srcnode == ctdb->vnn) {
445                 /* don't update ourselves! */
446                 return 0;
447         }
448
449         ctdb_db = find_ctdb_db(ctdb, db_id);
450         if (!ctdb_db) {
451                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n"));
452                 return -1;
453         }
454
455         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
456         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
457         return 0;
458 }
459
460 /*
461   timer to check for seqnum changes in a ltdb and propogate them
462  */
463 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
464                                    struct timeval t, void *p)
465 {
466         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
467         struct ctdb_context *ctdb = ctdb_db->ctdb;
468         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
469         if (new_seqnum != ctdb_db->seqnum) {
470                 /* something has changed - propogate it */
471                 TDB_DATA data;
472                 data.dptr = (uint8_t *)&ctdb_db->db_id;
473                 data.dsize = sizeof(uint32_t);
474                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
475                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
476                                          data, NULL, NULL);             
477         }
478         ctdb_db->seqnum = new_seqnum;
479
480         /* setup a new timer */
481         ctdb_db->te = event_add_timed(ctdb->ev, ctdb_db, 
482                                       timeval_current_ofs(ctdb->seqnum_frequency, 0),
483                                       ctdb_ltdb_seqnum_check, ctdb_db);
484 }
485
486 /*
487   enable seqnum handling on this db
488  */
489 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
490 {
491         struct ctdb_db_context *ctdb_db;
492         ctdb_db = find_ctdb_db(ctdb, db_id);
493         if (!ctdb_db) {
494                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n"));
495                 return -1;
496         }
497
498         if (ctdb_db->te == NULL) {
499                 ctdb_db->te = event_add_timed(ctdb->ev, ctdb_db, 
500                                               timeval_current_ofs(ctdb->seqnum_frequency, 0),
501                                               ctdb_ltdb_seqnum_check, ctdb_db);
502         }
503
504         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
505         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
506         return 0;
507 }
508
509 /*
510   enable seqnum handling on this db
511  */
512 int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency)
513 {
514         ctdb->seqnum_frequency = frequency;
515         return 0;
516 }