dbwrap ctdb: remove erroneously duplicated comment.
[samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_ctx {
27         struct tdb_wrap *wtdb;
28         uint32 db_id;
29 };
30
31 struct db_ctdb_rec {
32         struct db_ctdb_ctx *ctdb_ctx;
33         struct ctdb_ltdb_header header;
34 };
35
36 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
37 {
38         struct db_ctdb_rec *crec = talloc_get_type_abort(
39                 rec->private_data, struct db_ctdb_rec);
40         TDB_DATA cdata;
41         int ret;
42
43         cdata.dsize = sizeof(crec->header) + data.dsize;
44
45         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
46                 return NT_STATUS_NO_MEMORY;
47         }
48
49         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
50         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
51
52         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
53
54         SAFE_FREE(cdata.dptr);
55
56         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
57 }
58
59
60 /* for persistent databases the store is a bit different. We have to
61    ask the ctdb daemon to push the record to all nodes after the
62    store */
63 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
64 {
65         struct db_ctdb_rec *crec = talloc_get_type_abort(
66                 rec->private_data, struct db_ctdb_rec);
67         TDB_DATA cdata;
68         int ret;
69         NTSTATUS status;
70
71         cdata.dsize = sizeof(crec->header) + data.dsize;
72
73         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
74                 return NT_STATUS_NO_MEMORY;
75         }
76
77         crec->header.rsn++;
78
79         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
80         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
81
82         status = ctdbd_start_persistent_update(messaging_ctdbd_connection(), crec->ctdb_ctx->db_id, rec->key, cdata);
83         
84         if (NT_STATUS_IS_OK(status)) {
85                 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
86                 status = (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
87         }
88
89         /* now tell ctdbd to update this record on all other nodes */
90         if (NT_STATUS_IS_OK(status)) {
91                 status = ctdbd_persistent_store(messaging_ctdbd_connection(), crec->ctdb_ctx->db_id, rec->key, cdata);
92         } else {
93                 ctdbd_cancel_persistent_update(messaging_ctdbd_connection(), crec->ctdb_ctx->db_id, rec->key, cdata);
94         }
95
96         SAFE_FREE(cdata.dptr);
97
98         return status;
99 }
100
101 static NTSTATUS db_ctdb_delete(struct db_record *rec)
102 {
103         struct db_ctdb_rec *crec = talloc_get_type_abort(
104                 rec->private_data, struct db_ctdb_rec);
105         TDB_DATA data;
106         int ret;
107
108         /*
109          * We have to store the header with empty data. TODO: Fix the
110          * tdb-level cleanup
111          */
112
113         data.dptr = (uint8 *)&crec->header;
114         data.dsize = sizeof(crec->header);
115
116         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, data, TDB_REPLACE);
117
118         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
119 }
120
121 static int db_ctdb_record_destr(struct db_record* data)
122 {
123         struct db_ctdb_rec *crec = talloc_get_type_abort(
124                 data->private_data, struct db_ctdb_rec);
125
126         DEBUG(10, (DEBUGLEVEL > 10
127                    ? "Unlocking db %u key %s\n"
128                    : "Unlocking db %u key %.20s\n",
129                    (int)crec->ctdb_ctx->db_id,
130                    hex_encode(data, (unsigned char *)data->key.dptr,
131                               data->key.dsize)));
132
133         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
134                 DEBUG(0, ("tdb_chainunlock failed\n"));
135                 return -1;
136         }
137
138         return 0;
139 }
140
141 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
142                                               TALLOC_CTX *mem_ctx,
143                                               TDB_DATA key)
144 {
145         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
146                                                         struct db_ctdb_ctx);
147         struct db_record *result;
148         struct db_ctdb_rec *crec;
149         NTSTATUS status;
150         TDB_DATA ctdb_data;
151         int migrate_attempts = 0;
152
153         if (!(result = talloc(mem_ctx, struct db_record))) {
154                 DEBUG(0, ("talloc failed\n"));
155                 return NULL;
156         }
157
158         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
159                 DEBUG(0, ("talloc failed\n"));
160                 TALLOC_FREE(result);
161                 return NULL;
162         }
163
164         result->private_data = (void *)crec;
165         crec->ctdb_ctx = ctx;
166
167         result->key.dsize = key.dsize;
168         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
169         if (result->key.dptr == NULL) {
170                 DEBUG(0, ("talloc failed\n"));
171                 TALLOC_FREE(result);
172                 return NULL;
173         }
174
175         /*
176          * Do a blocking lock on the record
177          */
178 again:
179
180         if (DEBUGLEVEL >= 10) {
181                 char *keystr = hex_encode(result, key.dptr, key.dsize);
182                 DEBUG(10, (DEBUGLEVEL > 10
183                            ? "Locking db %u key %s\n"
184                            : "Locking db %u key %.20s\n",
185                            (int)crec->ctdb_ctx->db_id, keystr));
186                 TALLOC_FREE(keystr);
187         }
188         
189         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
190                 DEBUG(3, ("tdb_chainlock failed\n"));
191                 TALLOC_FREE(result);
192                 return NULL;
193         }
194
195         if (db->persistent) {
196                 result->store = db_ctdb_store_persistent;
197         } else {
198                 result->store = db_ctdb_store;
199         }
200         result->delete_rec = db_ctdb_delete;
201         talloc_set_destructor(result, db_ctdb_record_destr);
202
203         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
204
205         /*
206          * See if we have a valid record and we are the dmaster. If so, we can
207          * take the shortcut and just return it.
208          */
209
210         if ((ctdb_data.dptr == NULL) ||
211             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
212             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
213 #if 0
214             || (random() % 2 != 0)
215 #endif
216 ) {
217                 SAFE_FREE(ctdb_data.dptr);
218                 tdb_chainunlock(ctx->wtdb->tdb, key);
219                 talloc_set_destructor(result, NULL);
220
221                 migrate_attempts += 1;
222
223                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
224                            ctdb_data.dptr, ctdb_data.dptr ?
225                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
226                            get_my_vnn()));
227
228                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
229                 if (!NT_STATUS_IS_OK(status)) {
230                         DEBUG(5, ("ctdb_migrate failed: %s\n",
231                                   nt_errstr(status)));
232                         TALLOC_FREE(result);
233                         return NULL;
234                 }
235                 /* now its migrated, try again */
236                 goto again;
237         }
238
239         if (migrate_attempts > 10) {
240                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
241                           migrate_attempts));
242         }
243
244         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
245
246         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
247         result->value.dptr = NULL;
248
249         if ((result->value.dsize != 0)
250             && !(result->value.dptr = (uint8 *)talloc_memdup(
251                          result, ctdb_data.dptr + sizeof(crec->header),
252                          result->value.dsize))) {
253                 DEBUG(0, ("talloc failed\n"));
254                 TALLOC_FREE(result);
255         }
256
257         SAFE_FREE(ctdb_data.dptr);
258
259         return result;
260 }
261
262 /*
263   fetch (unlocked, no migration) operation on ctdb
264  */
265 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
266                          TDB_DATA key, TDB_DATA *data)
267 {
268         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
269                                                         struct db_ctdb_ctx);
270         NTSTATUS status;
271         TDB_DATA ctdb_data;
272
273         /* try a direct fetch */
274         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
275
276         /*
277          * See if we have a valid record and we are the dmaster. If so, we can
278          * take the shortcut and just return it.
279          * we bypass the dmaster check for persistent databases
280          */
281         if ((ctdb_data.dptr != NULL) &&
282             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
283             (db->persistent ||
284              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
285                 /* we are the dmaster - avoid the ctdb protocol op */
286
287                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
288                 if (data->dsize == 0) {
289                         SAFE_FREE(ctdb_data.dptr);
290                         data->dptr = NULL;
291                         return 0;
292                 }
293
294                 data->dptr = (uint8 *)talloc_memdup(
295                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
296                         data->dsize);
297
298                 SAFE_FREE(ctdb_data.dptr);
299
300                 if (data->dptr == NULL) {
301                         return -1;
302                 }
303                 return 0;
304         }
305
306         SAFE_FREE(ctdb_data.dptr);
307
308         /* we weren't able to get it locally - ask ctdb to fetch it for us */
309         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
310         if (!NT_STATUS_IS_OK(status)) {
311                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
312                 return -1;
313         }
314
315         return 0;
316 }
317
318 struct traverse_state {
319         struct db_context *db;
320         int (*fn)(struct db_record *rec, void *private_data);
321         void *private_data;
322 };
323
324 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
325 {
326         struct traverse_state *state = (struct traverse_state *)private_data;
327         struct db_record *rec;
328         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
329         /* we have to give them a locked record to prevent races */
330         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
331         if (rec && rec->value.dsize > 0) {
332                 state->fn(rec, state->private_data);
333         }
334         talloc_free(tmp_ctx);
335 }
336
337 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
338                                         void *private_data)
339 {
340         struct traverse_state *state = (struct traverse_state *)private_data;
341         struct db_record *rec;
342         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
343         int ret = 0;
344         /* we have to give them a locked record to prevent races */
345         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
346         if (rec && rec->value.dsize > 0) {
347                 ret = state->fn(rec, state->private_data);
348         }
349         talloc_free(tmp_ctx);
350         return ret;
351 }
352
353 static int db_ctdb_traverse(struct db_context *db,
354                             int (*fn)(struct db_record *rec,
355                                       void *private_data),
356                             void *private_data)
357 {
358         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
359                                                         struct db_ctdb_ctx);
360         struct traverse_state state;
361
362         state.db = db;
363         state.fn = fn;
364         state.private_data = private_data;
365
366         if (db->persistent) {
367                 /* for persistent databases we don't need to do a ctdb traverse,
368                    we can do a faster local traverse */
369                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
370         }
371
372
373         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
374         return 0;
375 }
376
377 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
378 {
379         return NT_STATUS_MEDIA_WRITE_PROTECTED;
380 }
381
382 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
383 {
384         return NT_STATUS_MEDIA_WRITE_PROTECTED;
385 }
386
387 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
388 {
389         struct traverse_state *state = (struct traverse_state *)private_data;
390         struct db_record rec;
391         rec.key = key;
392         rec.value = data;
393         rec.store = db_ctdb_store_deny;
394         rec.delete_rec = db_ctdb_delete_deny;
395         rec.private_data = state->db;
396         state->fn(&rec, state->private_data);
397 }
398
399 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
400                                         void *private_data)
401 {
402         struct traverse_state *state = (struct traverse_state *)private_data;
403         struct db_record rec;
404         rec.key = kbuf;
405         rec.value = dbuf;
406         rec.store = db_ctdb_store_deny;
407         rec.delete_rec = db_ctdb_delete_deny;
408         rec.private_data = state->db;
409
410         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
411                 /* a deleted record */
412                 return 0;
413         }
414         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
415         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
416
417         return state->fn(&rec, state->private_data);
418 }
419
420 static int db_ctdb_traverse_read(struct db_context *db,
421                                  int (*fn)(struct db_record *rec,
422                                            void *private_data),
423                                  void *private_data)
424 {
425         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
426                                                         struct db_ctdb_ctx);
427         struct traverse_state state;
428
429         state.db = db;
430         state.fn = fn;
431         state.private_data = private_data;
432
433         if (db->persistent) {
434                 /* for persistent databases we don't need to do a ctdb traverse,
435                    we can do a faster local traverse */
436                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
437         }
438
439         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
440         return 0;
441 }
442
443 static int db_ctdb_get_seqnum(struct db_context *db)
444 {
445         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
446                                                         struct db_ctdb_ctx);
447         return tdb_get_seqnum(ctx->wtdb->tdb);
448 }
449
450 static int db_ctdb_trans_dummy(struct db_context *db)
451 {
452         /*
453          * Not implemented yet, just return ok
454          */
455         return 0;
456 }
457
458 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
459                                 const char *name,
460                                 int hash_size, int tdb_flags,
461                                 int open_flags, mode_t mode)
462 {
463         struct db_context *result;
464         struct db_ctdb_ctx *db_ctdb;
465         char *db_path;
466
467         if (!lp_clustering()) {
468                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
469                 return NULL;
470         }
471
472         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
473                 DEBUG(0, ("talloc failed\n"));
474                 TALLOC_FREE(result);
475                 return NULL;
476         }
477
478         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
479                 DEBUG(0, ("talloc failed\n"));
480                 TALLOC_FREE(result);
481                 return NULL;
482         }
483
484         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
485                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
486                 TALLOC_FREE(result);
487                 return NULL;
488         }
489
490         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
491
492         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
493
494         /* only pass through specific flags */
495         tdb_flags &= TDB_SEQNUM;
496
497         /* honor permissions if user has specified O_CREAT */
498         if (open_flags & O_CREAT) {
499                 chmod(db_path, mode);
500         }
501
502         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
503         if (db_ctdb->wtdb == NULL) {
504                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
505                 TALLOC_FREE(result);
506                 return NULL;
507         }
508         talloc_free(db_path);
509
510         result->private_data = (void *)db_ctdb;
511         result->fetch_locked = db_ctdb_fetch_locked;
512         result->fetch = db_ctdb_fetch;
513         result->traverse = db_ctdb_traverse;
514         result->traverse_read = db_ctdb_traverse_read;
515         result->get_seqnum = db_ctdb_get_seqnum;
516         result->transaction_start = db_ctdb_trans_dummy;
517         result->transaction_commit = db_ctdb_trans_dummy;
518         result->transaction_cancel = db_ctdb_trans_dummy;
519
520         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
521                  name, db_ctdb->db_id));
522
523         return result;
524 }
525 #endif