dbwrap ctdb: add db_ctdb_delete_persistent() and use it for persistent DBs
[metze/samba/wip.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_ctx {
27         struct tdb_wrap *wtdb;
28         uint32 db_id;
29 };
30
31 struct db_ctdb_rec {
32         struct db_ctdb_ctx *ctdb_ctx;
33         struct ctdb_ltdb_header header;
34 };
35
36 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
37                                                TALLOC_CTX *mem_ctx,
38                                                TDB_DATA key,
39                                                bool persistent);
40
41 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
42 {
43         struct db_ctdb_rec *crec = talloc_get_type_abort(
44                 rec->private_data, struct db_ctdb_rec);
45         TDB_DATA cdata;
46         int ret;
47
48         cdata.dsize = sizeof(crec->header) + data.dsize;
49
50         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
51                 return NT_STATUS_NO_MEMORY;
52         }
53
54         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
55         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
56
57         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
58
59         SAFE_FREE(cdata.dptr);
60
61         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
62 }
63
64
65 /* for persistent databases the store is a bit different. We have to
66    ask the ctdb daemon to push the record to all nodes after the
67    store */
68 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
69 {
70         struct db_ctdb_rec *crec;
71         struct db_record *record;
72         TDB_DATA cdata;
73         int ret;
74         NTSTATUS status;
75         uint32_t count;
76         int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
77
78         for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
79              (count < max_retries) && !NT_STATUS_IS_OK(status);
80              count++)
81         {
82                 if (count > 0) {
83                         /* retry */
84                         /*
85                          * There is a hack here: We use rec as a memory
86                          * context and re-use it as the record struct ptr.
87                          * We don't free the record data allocated
88                          * in each turn. So all gets freed when the caller
89                          * releases the original record. This is because
90                          * we don't get the record passed in by reference
91                          * in the first place and the caller relies on
92                          * having to free the record himself.
93                          */
94                         record = fetch_locked_internal(crec->ctdb_ctx,
95                                                        rec,
96                                                        rec->key,
97                                                        true /* persistent */);
98                         if (record == NULL) {
99                                 DEBUG(5, ("fetch_locked_internal failed.\n"));
100                                 status = NT_STATUS_NO_MEMORY;
101                                 break;
102                         }
103                 }
104
105                 crec = talloc_get_type_abort(record->private_data,
106                                              struct db_ctdb_rec);
107
108                 cdata.dsize = sizeof(crec->header) + data.dsize;
109
110                 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
111                         return NT_STATUS_NO_MEMORY;
112                 }
113
114                 crec->header.rsn++;
115
116                 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
117                 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
118
119                 status = ctdbd_start_persistent_update(
120                                 messaging_ctdbd_connection(),
121                                 crec->ctdb_ctx->db_id,
122                                 rec->key,
123                                 cdata);
124
125                 if (NT_STATUS_IS_OK(status)) {
126                         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
127                                         cdata, TDB_REPLACE);
128                         status = (ret == 0) ? NT_STATUS_OK
129                                             : NT_STATUS_INTERNAL_DB_CORRUPTION;
130                 }
131
132                 /*
133                  * release the lock *now* in order to prevent deadlocks.
134                  *
135                  * There is a tradeoff: Usually, the record is still locked
136                  * after db->store operation. This lock is usually released
137                  * via the talloc destructor with the TALLOC_FREE to
138                  * the record. So we have two choices:
139                  *
140                  * - Either re-lock the record after the call to persistent_store
141                  *   or cancel_persistent update and this way not changing any
142                  *   assumptions callers may have about the state, but possibly
143                  *   introducing new race conditions.
144                  *
145                  * - Or don't lock the record again but just remove the
146                  *   talloc_destructor. This is less racy but assumes that
147                  *   the lock is always released via TALLOC_FREE of the record.
148                  *
149                  * I choose the first variant for now since it seems less racy.
150                  * We can't guarantee that we succeed in getting the lock
151                  * anyways. The only real danger here is that a caller
152                  * performs multiple store operations after a fetch_locked()
153                  * which is currently not the case.
154                  */
155                 tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
156                 talloc_set_destructor(record, NULL);
157
158                 /* now tell ctdbd to update this record on all other nodes */
159                 if (NT_STATUS_IS_OK(status)) {
160                         status = ctdbd_persistent_store(
161                                         messaging_ctdbd_connection(),
162                                         crec->ctdb_ctx->db_id,
163                                         rec->key,
164                                         cdata);
165                 } else {
166                         ctdbd_cancel_persistent_update(
167                                         messaging_ctdbd_connection(),
168                                         crec->ctdb_ctx->db_id,
169                                         rec->key,
170                                         cdata);
171                 }
172
173                 SAFE_FREE(cdata.dptr);
174         } /* retry-loop */
175
176         if (!NT_STATUS_IS_OK(status)) {
177                 DEBUG(5, ("ctdbd_persistent_store still failed after "
178                           "%d retries with error %s - giving up.\n",
179                           count, nt_errstr(status)));
180         }
181
182         SAFE_FREE(cdata.dptr);
183
184         return status;
185 }
186
187 static NTSTATUS db_ctdb_delete(struct db_record *rec)
188 {
189         TDB_DATA data;
190
191         /*
192          * We have to store the header with empty data. TODO: Fix the
193          * tdb-level cleanup
194          */
195
196         ZERO_STRUCT(data);
197
198         return db_ctdb_store(rec, data, 0);
199
200 }
201
202 static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
203 {
204         TDB_DATA data;
205
206         /*
207          * We have to store the header with empty data. TODO: Fix the
208          * tdb-level cleanup
209          */
210
211         ZERO_STRUCT(data);
212
213         return db_ctdb_store_persistent(rec, data, 0);
214
215 }
216
217 static int db_ctdb_record_destr(struct db_record* data)
218 {
219         struct db_ctdb_rec *crec = talloc_get_type_abort(
220                 data->private_data, struct db_ctdb_rec);
221
222         DEBUG(10, (DEBUGLEVEL > 10
223                    ? "Unlocking db %u key %s\n"
224                    : "Unlocking db %u key %.20s\n",
225                    (int)crec->ctdb_ctx->db_id,
226                    hex_encode(data, (unsigned char *)data->key.dptr,
227                               data->key.dsize)));
228
229         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
230                 DEBUG(0, ("tdb_chainunlock failed\n"));
231                 return -1;
232         }
233
234         return 0;
235 }
236
237 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
238                                                TALLOC_CTX *mem_ctx,
239                                                TDB_DATA key,
240                                                bool persistent)
241 {
242         struct db_record *result;
243         struct db_ctdb_rec *crec;
244         NTSTATUS status;
245         TDB_DATA ctdb_data;
246         int migrate_attempts = 0;
247
248         if (!(result = talloc(mem_ctx, struct db_record))) {
249                 DEBUG(0, ("talloc failed\n"));
250                 return NULL;
251         }
252
253         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
254                 DEBUG(0, ("talloc failed\n"));
255                 TALLOC_FREE(result);
256                 return NULL;
257         }
258
259         result->private_data = (void *)crec;
260         crec->ctdb_ctx = ctx;
261
262         result->key.dsize = key.dsize;
263         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
264         if (result->key.dptr == NULL) {
265                 DEBUG(0, ("talloc failed\n"));
266                 TALLOC_FREE(result);
267                 return NULL;
268         }
269
270         /*
271          * Do a blocking lock on the record
272          */
273 again:
274
275         if (DEBUGLEVEL >= 10) {
276                 char *keystr = hex_encode(result, key.dptr, key.dsize);
277                 DEBUG(10, (DEBUGLEVEL > 10
278                            ? "Locking db %u key %s\n"
279                            : "Locking db %u key %.20s\n",
280                            (int)crec->ctdb_ctx->db_id, keystr));
281                 TALLOC_FREE(keystr);
282         }
283         
284         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
285                 DEBUG(3, ("tdb_chainlock failed\n"));
286                 TALLOC_FREE(result);
287                 return NULL;
288         }
289
290         if (persistent) {
291                 result->store = db_ctdb_store_persistent;
292                 result->delete_rec = db_ctdb_delete_persistent;
293         } else {
294                 result->store = db_ctdb_store;
295                 result->delete_rec = db_ctdb_delete;
296         }
297         talloc_set_destructor(result, db_ctdb_record_destr);
298
299         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
300
301         /*
302          * See if we have a valid record and we are the dmaster. If so, we can
303          * take the shortcut and just return it.
304          */
305
306         if ((ctdb_data.dptr == NULL) ||
307             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
308             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
309 #if 0
310             || (random() % 2 != 0)
311 #endif
312 ) {
313                 SAFE_FREE(ctdb_data.dptr);
314                 tdb_chainunlock(ctx->wtdb->tdb, key);
315                 talloc_set_destructor(result, NULL);
316
317                 migrate_attempts += 1;
318
319                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
320                            ctdb_data.dptr, ctdb_data.dptr ?
321                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
322                            get_my_vnn()));
323
324                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
325                 if (!NT_STATUS_IS_OK(status)) {
326                         DEBUG(5, ("ctdb_migrate failed: %s\n",
327                                   nt_errstr(status)));
328                         TALLOC_FREE(result);
329                         return NULL;
330                 }
331                 /* now its migrated, try again */
332                 goto again;
333         }
334
335         if (migrate_attempts > 10) {
336                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
337                           migrate_attempts));
338         }
339
340         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
341
342         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
343         result->value.dptr = NULL;
344
345         if ((result->value.dsize != 0)
346             && !(result->value.dptr = (uint8 *)talloc_memdup(
347                          result, ctdb_data.dptr + sizeof(crec->header),
348                          result->value.dsize))) {
349                 DEBUG(0, ("talloc failed\n"));
350                 TALLOC_FREE(result);
351         }
352
353         SAFE_FREE(ctdb_data.dptr);
354
355         return result;
356 }
357
358 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
359                                               TALLOC_CTX *mem_ctx,
360                                               TDB_DATA key)
361 {
362         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
363                                                         struct db_ctdb_ctx);
364
365         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
366 }
367
368 /*
369   fetch (unlocked, no migration) operation on ctdb
370  */
371 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
372                          TDB_DATA key, TDB_DATA *data)
373 {
374         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
375                                                         struct db_ctdb_ctx);
376         NTSTATUS status;
377         TDB_DATA ctdb_data;
378
379         /* try a direct fetch */
380         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
381
382         /*
383          * See if we have a valid record and we are the dmaster. If so, we can
384          * take the shortcut and just return it.
385          * we bypass the dmaster check for persistent databases
386          */
387         if ((ctdb_data.dptr != NULL) &&
388             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
389             (db->persistent ||
390              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
391                 /* we are the dmaster - avoid the ctdb protocol op */
392
393                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
394                 if (data->dsize == 0) {
395                         SAFE_FREE(ctdb_data.dptr);
396                         data->dptr = NULL;
397                         return 0;
398                 }
399
400                 data->dptr = (uint8 *)talloc_memdup(
401                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
402                         data->dsize);
403
404                 SAFE_FREE(ctdb_data.dptr);
405
406                 if (data->dptr == NULL) {
407                         return -1;
408                 }
409                 return 0;
410         }
411
412         SAFE_FREE(ctdb_data.dptr);
413
414         /* we weren't able to get it locally - ask ctdb to fetch it for us */
415         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
416         if (!NT_STATUS_IS_OK(status)) {
417                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
418                 return -1;
419         }
420
421         return 0;
422 }
423
424 struct traverse_state {
425         struct db_context *db;
426         int (*fn)(struct db_record *rec, void *private_data);
427         void *private_data;
428 };
429
430 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
431 {
432         struct traverse_state *state = (struct traverse_state *)private_data;
433         struct db_record *rec;
434         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
435         /* we have to give them a locked record to prevent races */
436         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
437         if (rec && rec->value.dsize > 0) {
438                 state->fn(rec, state->private_data);
439         }
440         talloc_free(tmp_ctx);
441 }
442
443 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
444                                         void *private_data)
445 {
446         struct traverse_state *state = (struct traverse_state *)private_data;
447         struct db_record *rec;
448         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
449         int ret = 0;
450         /* we have to give them a locked record to prevent races */
451         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
452         if (rec && rec->value.dsize > 0) {
453                 ret = state->fn(rec, state->private_data);
454         }
455         talloc_free(tmp_ctx);
456         return ret;
457 }
458
459 static int db_ctdb_traverse(struct db_context *db,
460                             int (*fn)(struct db_record *rec,
461                                       void *private_data),
462                             void *private_data)
463 {
464         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
465                                                         struct db_ctdb_ctx);
466         struct traverse_state state;
467
468         state.db = db;
469         state.fn = fn;
470         state.private_data = private_data;
471
472         if (db->persistent) {
473                 /* for persistent databases we don't need to do a ctdb traverse,
474                    we can do a faster local traverse */
475                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
476         }
477
478
479         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
480         return 0;
481 }
482
483 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
484 {
485         return NT_STATUS_MEDIA_WRITE_PROTECTED;
486 }
487
488 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
489 {
490         return NT_STATUS_MEDIA_WRITE_PROTECTED;
491 }
492
493 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
494 {
495         struct traverse_state *state = (struct traverse_state *)private_data;
496         struct db_record rec;
497         rec.key = key;
498         rec.value = data;
499         rec.store = db_ctdb_store_deny;
500         rec.delete_rec = db_ctdb_delete_deny;
501         rec.private_data = state->db;
502         state->fn(&rec, state->private_data);
503 }
504
505 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
506                                         void *private_data)
507 {
508         struct traverse_state *state = (struct traverse_state *)private_data;
509         struct db_record rec;
510         rec.key = kbuf;
511         rec.value = dbuf;
512         rec.store = db_ctdb_store_deny;
513         rec.delete_rec = db_ctdb_delete_deny;
514         rec.private_data = state->db;
515
516         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
517                 /* a deleted record */
518                 return 0;
519         }
520         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
521         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
522
523         return state->fn(&rec, state->private_data);
524 }
525
526 static int db_ctdb_traverse_read(struct db_context *db,
527                                  int (*fn)(struct db_record *rec,
528                                            void *private_data),
529                                  void *private_data)
530 {
531         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
532                                                         struct db_ctdb_ctx);
533         struct traverse_state state;
534
535         state.db = db;
536         state.fn = fn;
537         state.private_data = private_data;
538
539         if (db->persistent) {
540                 /* for persistent databases we don't need to do a ctdb traverse,
541                    we can do a faster local traverse */
542                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
543         }
544
545         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
546         return 0;
547 }
548
549 static int db_ctdb_get_seqnum(struct db_context *db)
550 {
551         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
552                                                         struct db_ctdb_ctx);
553         return tdb_get_seqnum(ctx->wtdb->tdb);
554 }
555
556 static int db_ctdb_trans_dummy(struct db_context *db)
557 {
558         /*
559          * Not implemented yet, just return ok
560          */
561         return 0;
562 }
563
564 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
565                                 const char *name,
566                                 int hash_size, int tdb_flags,
567                                 int open_flags, mode_t mode)
568 {
569         struct db_context *result;
570         struct db_ctdb_ctx *db_ctdb;
571         char *db_path;
572
573         if (!lp_clustering()) {
574                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
575                 return NULL;
576         }
577
578         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
579                 DEBUG(0, ("talloc failed\n"));
580                 TALLOC_FREE(result);
581                 return NULL;
582         }
583
584         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
585                 DEBUG(0, ("talloc failed\n"));
586                 TALLOC_FREE(result);
587                 return NULL;
588         }
589
590         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
591                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
592                 TALLOC_FREE(result);
593                 return NULL;
594         }
595
596         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
597
598         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
599
600         /* only pass through specific flags */
601         tdb_flags &= TDB_SEQNUM;
602
603         /* honor permissions if user has specified O_CREAT */
604         if (open_flags & O_CREAT) {
605                 chmod(db_path, mode);
606         }
607
608         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
609         if (db_ctdb->wtdb == NULL) {
610                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
611                 TALLOC_FREE(result);
612                 return NULL;
613         }
614         talloc_free(db_path);
615
616         result->private_data = (void *)db_ctdb;
617         result->fetch_locked = db_ctdb_fetch_locked;
618         result->fetch = db_ctdb_fetch;
619         result->traverse = db_ctdb_traverse;
620         result->traverse_read = db_ctdb_traverse_read;
621         result->get_seqnum = db_ctdb_get_seqnum;
622         result->transaction_start = db_ctdb_trans_dummy;
623         result->transaction_commit = db_ctdb_trans_dummy;
624         result->transaction_cancel = db_ctdb_trans_dummy;
625
626         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
627                  name, db_ctdb->db_id));
628
629         return result;
630 }
631 #endif