dbwrap ctdb: call db_ctdb_store() in db_ctdb_delete().
[samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_ctx {
27         struct tdb_wrap *wtdb;
28         uint32 db_id;
29 };
30
31 struct db_ctdb_rec {
32         struct db_ctdb_ctx *ctdb_ctx;
33         struct ctdb_ltdb_header header;
34 };
35
36 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
37                                                TALLOC_CTX *mem_ctx,
38                                                TDB_DATA key,
39                                                bool persistent);
40
41 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
42 {
43         struct db_ctdb_rec *crec = talloc_get_type_abort(
44                 rec->private_data, struct db_ctdb_rec);
45         TDB_DATA cdata;
46         int ret;
47
48         cdata.dsize = sizeof(crec->header) + data.dsize;
49
50         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
51                 return NT_STATUS_NO_MEMORY;
52         }
53
54         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
55         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
56
57         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
58
59         SAFE_FREE(cdata.dptr);
60
61         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
62 }
63
64
65 /* for persistent databases the store is a bit different. We have to
66    ask the ctdb daemon to push the record to all nodes after the
67    store */
68 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
69 {
70         struct db_ctdb_rec *crec;
71         struct db_record *record;
72         TDB_DATA cdata;
73         int ret;
74         NTSTATUS status;
75         uint32_t count;
76         int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
77
78         for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
79              (count < max_retries) && !NT_STATUS_IS_OK(status);
80              count++)
81         {
82                 if (count > 0) {
83                         /* retry */
84                         /*
85                          * There is a hack here: We use rec as a memory
86                          * context and re-use it as the record struct ptr.
87                          * We don't free the record data allocated
88                          * in each turn. So all gets freed when the caller
89                          * releases the original record. This is because
90                          * we don't get the record passed in by reference
91                          * in the first place and the caller relies on
92                          * having to free the record himself.
93                          */
94                         record = fetch_locked_internal(crec->ctdb_ctx,
95                                                        rec,
96                                                        rec->key,
97                                                        true /* persistent */);
98                         if (record == NULL) {
99                                 DEBUG(5, ("fetch_locked_internal failed.\n"));
100                                 status = NT_STATUS_NO_MEMORY;
101                                 break;
102                         }
103                 }
104
105                 crec = talloc_get_type_abort(record->private_data,
106                                              struct db_ctdb_rec);
107
108                 cdata.dsize = sizeof(crec->header) + data.dsize;
109
110                 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
111                         return NT_STATUS_NO_MEMORY;
112                 }
113
114                 crec->header.rsn++;
115
116                 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
117                 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
118
119                 status = ctdbd_start_persistent_update(
120                                 messaging_ctdbd_connection(),
121                                 crec->ctdb_ctx->db_id,
122                                 rec->key,
123                                 cdata);
124
125                 if (NT_STATUS_IS_OK(status)) {
126                         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
127                                         cdata, TDB_REPLACE);
128                         status = (ret == 0) ? NT_STATUS_OK
129                                             : NT_STATUS_INTERNAL_DB_CORRUPTION;
130                 }
131
132                 /*
133                  * release the lock *now* in order to prevent deadlocks.
134                  *
135                  * There is a tradeoff: Usually, the record is still locked
136                  * after db->store operation. This lock is usually released
137                  * via the talloc destructor with the TALLOC_FREE to
138                  * the record. So we have two choices:
139                  *
140                  * - Either re-lock the record after the call to persistent_store
141                  *   or cancel_persistent update and this way not changing any
142                  *   assumptions callers may have about the state, but possibly
143                  *   introducing new race conditions.
144                  *
145                  * - Or don't lock the record again but just remove the
146                  *   talloc_destructor. This is less racy but assumes that
147                  *   the lock is always released via TALLOC_FREE of the record.
148                  *
149                  * I choose the first variant for now since it seems less racy.
150                  * We can't guarantee that we succeed in getting the lock
151                  * anyways. The only real danger here is that a caller
152                  * performs multiple store operations after a fetch_locked()
153                  * which is currently not the case.
154                  */
155                 tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
156                 talloc_set_destructor(record, NULL);
157
158                 /* now tell ctdbd to update this record on all other nodes */
159                 if (NT_STATUS_IS_OK(status)) {
160                         status = ctdbd_persistent_store(
161                                         messaging_ctdbd_connection(),
162                                         crec->ctdb_ctx->db_id,
163                                         rec->key,
164                                         cdata);
165                 } else {
166                         ctdbd_cancel_persistent_update(
167                                         messaging_ctdbd_connection(),
168                                         crec->ctdb_ctx->db_id,
169                                         rec->key,
170                                         cdata);
171                 }
172
173                 SAFE_FREE(cdata.dptr);
174         } /* retry-loop */
175
176         if (!NT_STATUS_IS_OK(status)) {
177                 DEBUG(5, ("ctdbd_persistent_store still failed after "
178                           "%d retries with error %s - giving up.\n",
179                           count, nt_errstr(status)));
180         }
181
182         SAFE_FREE(cdata.dptr);
183
184         return status;
185 }
186
187 static NTSTATUS db_ctdb_delete(struct db_record *rec)
188 {
189         TDB_DATA data;
190
191         /*
192          * We have to store the header with empty data. TODO: Fix the
193          * tdb-level cleanup
194          */
195
196         ZERO_STRUCT(data);
197
198         return db_ctdb_store(rec, data, 0);
199
200 }
201
202 static int db_ctdb_record_destr(struct db_record* data)
203 {
204         struct db_ctdb_rec *crec = talloc_get_type_abort(
205                 data->private_data, struct db_ctdb_rec);
206
207         DEBUG(10, (DEBUGLEVEL > 10
208                    ? "Unlocking db %u key %s\n"
209                    : "Unlocking db %u key %.20s\n",
210                    (int)crec->ctdb_ctx->db_id,
211                    hex_encode(data, (unsigned char *)data->key.dptr,
212                               data->key.dsize)));
213
214         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
215                 DEBUG(0, ("tdb_chainunlock failed\n"));
216                 return -1;
217         }
218
219         return 0;
220 }
221
222 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
223                                                TALLOC_CTX *mem_ctx,
224                                                TDB_DATA key,
225                                                bool persistent)
226 {
227         struct db_record *result;
228         struct db_ctdb_rec *crec;
229         NTSTATUS status;
230         TDB_DATA ctdb_data;
231         int migrate_attempts = 0;
232
233         if (!(result = talloc(mem_ctx, struct db_record))) {
234                 DEBUG(0, ("talloc failed\n"));
235                 return NULL;
236         }
237
238         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
239                 DEBUG(0, ("talloc failed\n"));
240                 TALLOC_FREE(result);
241                 return NULL;
242         }
243
244         result->private_data = (void *)crec;
245         crec->ctdb_ctx = ctx;
246
247         result->key.dsize = key.dsize;
248         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
249         if (result->key.dptr == NULL) {
250                 DEBUG(0, ("talloc failed\n"));
251                 TALLOC_FREE(result);
252                 return NULL;
253         }
254
255         /*
256          * Do a blocking lock on the record
257          */
258 again:
259
260         if (DEBUGLEVEL >= 10) {
261                 char *keystr = hex_encode(result, key.dptr, key.dsize);
262                 DEBUG(10, (DEBUGLEVEL > 10
263                            ? "Locking db %u key %s\n"
264                            : "Locking db %u key %.20s\n",
265                            (int)crec->ctdb_ctx->db_id, keystr));
266                 TALLOC_FREE(keystr);
267         }
268         
269         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
270                 DEBUG(3, ("tdb_chainlock failed\n"));
271                 TALLOC_FREE(result);
272                 return NULL;
273         }
274
275         if (persistent) {
276                 result->store = db_ctdb_store_persistent;
277         } else {
278                 result->store = db_ctdb_store;
279         }
280         result->delete_rec = db_ctdb_delete;
281         talloc_set_destructor(result, db_ctdb_record_destr);
282
283         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
284
285         /*
286          * See if we have a valid record and we are the dmaster. If so, we can
287          * take the shortcut and just return it.
288          */
289
290         if ((ctdb_data.dptr == NULL) ||
291             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
292             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
293 #if 0
294             || (random() % 2 != 0)
295 #endif
296 ) {
297                 SAFE_FREE(ctdb_data.dptr);
298                 tdb_chainunlock(ctx->wtdb->tdb, key);
299                 talloc_set_destructor(result, NULL);
300
301                 migrate_attempts += 1;
302
303                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
304                            ctdb_data.dptr, ctdb_data.dptr ?
305                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
306                            get_my_vnn()));
307
308                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
309                 if (!NT_STATUS_IS_OK(status)) {
310                         DEBUG(5, ("ctdb_migrate failed: %s\n",
311                                   nt_errstr(status)));
312                         TALLOC_FREE(result);
313                         return NULL;
314                 }
315                 /* now its migrated, try again */
316                 goto again;
317         }
318
319         if (migrate_attempts > 10) {
320                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
321                           migrate_attempts));
322         }
323
324         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
325
326         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
327         result->value.dptr = NULL;
328
329         if ((result->value.dsize != 0)
330             && !(result->value.dptr = (uint8 *)talloc_memdup(
331                          result, ctdb_data.dptr + sizeof(crec->header),
332                          result->value.dsize))) {
333                 DEBUG(0, ("talloc failed\n"));
334                 TALLOC_FREE(result);
335         }
336
337         SAFE_FREE(ctdb_data.dptr);
338
339         return result;
340 }
341
342 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
343                                               TALLOC_CTX *mem_ctx,
344                                               TDB_DATA key)
345 {
346         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
347                                                         struct db_ctdb_ctx);
348
349         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
350 }
351
352 /*
353   fetch (unlocked, no migration) operation on ctdb
354  */
355 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
356                          TDB_DATA key, TDB_DATA *data)
357 {
358         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
359                                                         struct db_ctdb_ctx);
360         NTSTATUS status;
361         TDB_DATA ctdb_data;
362
363         /* try a direct fetch */
364         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
365
366         /*
367          * See if we have a valid record and we are the dmaster. If so, we can
368          * take the shortcut and just return it.
369          * we bypass the dmaster check for persistent databases
370          */
371         if ((ctdb_data.dptr != NULL) &&
372             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
373             (db->persistent ||
374              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
375                 /* we are the dmaster - avoid the ctdb protocol op */
376
377                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
378                 if (data->dsize == 0) {
379                         SAFE_FREE(ctdb_data.dptr);
380                         data->dptr = NULL;
381                         return 0;
382                 }
383
384                 data->dptr = (uint8 *)talloc_memdup(
385                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
386                         data->dsize);
387
388                 SAFE_FREE(ctdb_data.dptr);
389
390                 if (data->dptr == NULL) {
391                         return -1;
392                 }
393                 return 0;
394         }
395
396         SAFE_FREE(ctdb_data.dptr);
397
398         /* we weren't able to get it locally - ask ctdb to fetch it for us */
399         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
400         if (!NT_STATUS_IS_OK(status)) {
401                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
402                 return -1;
403         }
404
405         return 0;
406 }
407
408 struct traverse_state {
409         struct db_context *db;
410         int (*fn)(struct db_record *rec, void *private_data);
411         void *private_data;
412 };
413
414 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
415 {
416         struct traverse_state *state = (struct traverse_state *)private_data;
417         struct db_record *rec;
418         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
419         /* we have to give them a locked record to prevent races */
420         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
421         if (rec && rec->value.dsize > 0) {
422                 state->fn(rec, state->private_data);
423         }
424         talloc_free(tmp_ctx);
425 }
426
427 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
428                                         void *private_data)
429 {
430         struct traverse_state *state = (struct traverse_state *)private_data;
431         struct db_record *rec;
432         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
433         int ret = 0;
434         /* we have to give them a locked record to prevent races */
435         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
436         if (rec && rec->value.dsize > 0) {
437                 ret = state->fn(rec, state->private_data);
438         }
439         talloc_free(tmp_ctx);
440         return ret;
441 }
442
443 static int db_ctdb_traverse(struct db_context *db,
444                             int (*fn)(struct db_record *rec,
445                                       void *private_data),
446                             void *private_data)
447 {
448         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
449                                                         struct db_ctdb_ctx);
450         struct traverse_state state;
451
452         state.db = db;
453         state.fn = fn;
454         state.private_data = private_data;
455
456         if (db->persistent) {
457                 /* for persistent databases we don't need to do a ctdb traverse,
458                    we can do a faster local traverse */
459                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
460         }
461
462
463         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
464         return 0;
465 }
466
467 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
468 {
469         return NT_STATUS_MEDIA_WRITE_PROTECTED;
470 }
471
472 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
473 {
474         return NT_STATUS_MEDIA_WRITE_PROTECTED;
475 }
476
477 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
478 {
479         struct traverse_state *state = (struct traverse_state *)private_data;
480         struct db_record rec;
481         rec.key = key;
482         rec.value = data;
483         rec.store = db_ctdb_store_deny;
484         rec.delete_rec = db_ctdb_delete_deny;
485         rec.private_data = state->db;
486         state->fn(&rec, state->private_data);
487 }
488
489 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
490                                         void *private_data)
491 {
492         struct traverse_state *state = (struct traverse_state *)private_data;
493         struct db_record rec;
494         rec.key = kbuf;
495         rec.value = dbuf;
496         rec.store = db_ctdb_store_deny;
497         rec.delete_rec = db_ctdb_delete_deny;
498         rec.private_data = state->db;
499
500         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
501                 /* a deleted record */
502                 return 0;
503         }
504         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
505         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
506
507         return state->fn(&rec, state->private_data);
508 }
509
510 static int db_ctdb_traverse_read(struct db_context *db,
511                                  int (*fn)(struct db_record *rec,
512                                            void *private_data),
513                                  void *private_data)
514 {
515         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
516                                                         struct db_ctdb_ctx);
517         struct traverse_state state;
518
519         state.db = db;
520         state.fn = fn;
521         state.private_data = private_data;
522
523         if (db->persistent) {
524                 /* for persistent databases we don't need to do a ctdb traverse,
525                    we can do a faster local traverse */
526                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
527         }
528
529         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
530         return 0;
531 }
532
533 static int db_ctdb_get_seqnum(struct db_context *db)
534 {
535         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
536                                                         struct db_ctdb_ctx);
537         return tdb_get_seqnum(ctx->wtdb->tdb);
538 }
539
540 static int db_ctdb_trans_dummy(struct db_context *db)
541 {
542         /*
543          * Not implemented yet, just return ok
544          */
545         return 0;
546 }
547
548 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
549                                 const char *name,
550                                 int hash_size, int tdb_flags,
551                                 int open_flags, mode_t mode)
552 {
553         struct db_context *result;
554         struct db_ctdb_ctx *db_ctdb;
555         char *db_path;
556
557         if (!lp_clustering()) {
558                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
559                 return NULL;
560         }
561
562         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
563                 DEBUG(0, ("talloc failed\n"));
564                 TALLOC_FREE(result);
565                 return NULL;
566         }
567
568         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
569                 DEBUG(0, ("talloc failed\n"));
570                 TALLOC_FREE(result);
571                 return NULL;
572         }
573
574         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
575                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
576                 TALLOC_FREE(result);
577                 return NULL;
578         }
579
580         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
581
582         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
583
584         /* only pass through specific flags */
585         tdb_flags &= TDB_SEQNUM;
586
587         /* honor permissions if user has specified O_CREAT */
588         if (open_flags & O_CREAT) {
589                 chmod(db_path, mode);
590         }
591
592         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
593         if (db_ctdb->wtdb == NULL) {
594                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
595                 TALLOC_FREE(result);
596                 return NULL;
597         }
598         talloc_free(db_path);
599
600         result->private_data = (void *)db_ctdb;
601         result->fetch_locked = db_ctdb_fetch_locked;
602         result->fetch = db_ctdb_fetch;
603         result->traverse = db_ctdb_traverse;
604         result->traverse_read = db_ctdb_traverse_read;
605         result->get_seqnum = db_ctdb_get_seqnum;
606         result->transaction_start = db_ctdb_trans_dummy;
607         result->transaction_commit = db_ctdb_trans_dummy;
608         result->transaction_cancel = db_ctdb_trans_dummy;
609
610         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
611                  name, db_ctdb->db_id));
612
613         return result;
614 }
615 #endif