69b3631c04d9fb9876a758879f963e2c997a1ea7
[obnox/samba-ctdb.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_ctx {
27         struct tdb_wrap *wtdb;
28         uint32 db_id;
29 };
30
31 struct db_ctdb_rec {
32         struct db_ctdb_ctx *ctdb_ctx;
33         struct ctdb_ltdb_header header;
34 };
35
36 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
37                                                TALLOC_CTX *mem_ctx,
38                                                TDB_DATA key,
39                                                bool persistent);
40
41 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
42 {
43         NTSTATUS status;
44         enum TDB_ERROR tret = tdb_error(tdb);
45
46         switch (tret) {
47         case TDB_ERR_EXISTS:
48                 status = NT_STATUS_OBJECT_NAME_COLLISION;
49                 break;
50         case TDB_ERR_NOEXIST:
51                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
52                 break;
53         default:
54                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
55                 break;
56         }
57
58         return status;
59 }
60
61 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
62 {
63         struct db_ctdb_rec *crec = talloc_get_type_abort(
64                 rec->private_data, struct db_ctdb_rec);
65         TDB_DATA cdata;
66         int ret;
67
68         cdata.dsize = sizeof(crec->header) + data.dsize;
69
70         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
71                 return NT_STATUS_NO_MEMORY;
72         }
73
74         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
75         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
76
77         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
78
79         SAFE_FREE(cdata.dptr);
80
81         return (ret == 0) ? NT_STATUS_OK
82                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
83 }
84
85
86 /* for persistent databases the store is a bit different. We have to
87    ask the ctdb daemon to push the record to all nodes after the
88    store */
89 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
90 {
91         struct db_ctdb_rec *crec;
92         struct db_record *record;
93         TDB_DATA cdata;
94         int ret;
95         NTSTATUS status;
96         uint32_t count;
97         int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
98
99         for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
100              (count < max_retries) && !NT_STATUS_IS_OK(status);
101              count++)
102         {
103                 if (count > 0) {
104                         /* retry */
105                         /*
106                          * There is a hack here: We use rec as a memory
107                          * context and re-use it as the record struct ptr.
108                          * We don't free the record data allocated
109                          * in each turn. So all gets freed when the caller
110                          * releases the original record. This is because
111                          * we don't get the record passed in by reference
112                          * in the first place and the caller relies on
113                          * having to free the record himself.
114                          */
115                         record = fetch_locked_internal(crec->ctdb_ctx,
116                                                        rec,
117                                                        rec->key,
118                                                        true /* persistent */);
119                         if (record == NULL) {
120                                 DEBUG(5, ("fetch_locked_internal failed.\n"));
121                                 status = NT_STATUS_NO_MEMORY;
122                                 break;
123                         }
124                 }
125
126                 crec = talloc_get_type_abort(record->private_data,
127                                              struct db_ctdb_rec);
128
129                 cdata.dsize = sizeof(crec->header) + data.dsize;
130
131                 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
132                         return NT_STATUS_NO_MEMORY;
133                 }
134
135                 crec->header.rsn++;
136
137                 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
138                 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
139
140                 status = ctdbd_start_persistent_update(
141                                 messaging_ctdbd_connection(),
142                                 crec->ctdb_ctx->db_id,
143                                 rec->key,
144                                 cdata);
145
146                 if (NT_STATUS_IS_OK(status)) {
147                         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
148                                         cdata, TDB_REPLACE);
149                         status = (ret == 0)
150                                         ? NT_STATUS_OK
151                                         : tdb_error_to_ntstatus(
152                                                 crec->ctdb_ctx->wtdb->tdb);
153                 }
154
155                 /*
156                  * release the lock *now* in order to prevent deadlocks.
157                  *
158                  * There is a tradeoff: Usually, the record is still locked
159                  * after db->store operation. This lock is usually released
160                  * via the talloc destructor with the TALLOC_FREE to
161                  * the record. So we have two choices:
162                  *
163                  * - Either re-lock the record after the call to persistent_store
164                  *   or cancel_persistent update and this way not changing any
165                  *   assumptions callers may have about the state, but possibly
166                  *   introducing new race conditions.
167                  *
168                  * - Or don't lock the record again but just remove the
169                  *   talloc_destructor. This is less racy but assumes that
170                  *   the lock is always released via TALLOC_FREE of the record.
171                  *
172                  * I choose the first variant for now since it seems less racy.
173                  * We can't guarantee that we succeed in getting the lock
174                  * anyways. The only real danger here is that a caller
175                  * performs multiple store operations after a fetch_locked()
176                  * which is currently not the case.
177                  */
178                 tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
179                 talloc_set_destructor(record, NULL);
180
181                 /* now tell ctdbd to update this record on all other nodes */
182                 if (NT_STATUS_IS_OK(status)) {
183                         status = ctdbd_persistent_store(
184                                         messaging_ctdbd_connection(),
185                                         crec->ctdb_ctx->db_id,
186                                         rec->key,
187                                         cdata);
188                 } else {
189                         ctdbd_cancel_persistent_update(
190                                         messaging_ctdbd_connection(),
191                                         crec->ctdb_ctx->db_id,
192                                         rec->key,
193                                         cdata);
194                 }
195
196                 SAFE_FREE(cdata.dptr);
197         } /* retry-loop */
198
199         if (!NT_STATUS_IS_OK(status)) {
200                 DEBUG(5, ("ctdbd_persistent_store still failed after "
201                           "%d retries with error %s - giving up.\n",
202                           count, nt_errstr(status)));
203         }
204
205         SAFE_FREE(cdata.dptr);
206
207         return status;
208 }
209
210 static NTSTATUS db_ctdb_delete(struct db_record *rec)
211 {
212         TDB_DATA data;
213
214         /*
215          * We have to store the header with empty data. TODO: Fix the
216          * tdb-level cleanup
217          */
218
219         ZERO_STRUCT(data);
220
221         return db_ctdb_store(rec, data, 0);
222
223 }
224
225 static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
226 {
227         TDB_DATA data;
228
229         /*
230          * We have to store the header with empty data. TODO: Fix the
231          * tdb-level cleanup
232          */
233
234         ZERO_STRUCT(data);
235
236         return db_ctdb_store_persistent(rec, data, 0);
237
238 }
239
240 static int db_ctdb_record_destr(struct db_record* data)
241 {
242         struct db_ctdb_rec *crec = talloc_get_type_abort(
243                 data->private_data, struct db_ctdb_rec);
244
245         DEBUG(10, (DEBUGLEVEL > 10
246                    ? "Unlocking db %u key %s\n"
247                    : "Unlocking db %u key %.20s\n",
248                    (int)crec->ctdb_ctx->db_id,
249                    hex_encode(data, (unsigned char *)data->key.dptr,
250                               data->key.dsize)));
251
252         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
253                 DEBUG(0, ("tdb_chainunlock failed\n"));
254                 return -1;
255         }
256
257         return 0;
258 }
259
260 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
261                                                TALLOC_CTX *mem_ctx,
262                                                TDB_DATA key,
263                                                bool persistent)
264 {
265         struct db_record *result;
266         struct db_ctdb_rec *crec;
267         NTSTATUS status;
268         TDB_DATA ctdb_data;
269         int migrate_attempts = 0;
270
271         if (!(result = talloc(mem_ctx, struct db_record))) {
272                 DEBUG(0, ("talloc failed\n"));
273                 return NULL;
274         }
275
276         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
277                 DEBUG(0, ("talloc failed\n"));
278                 TALLOC_FREE(result);
279                 return NULL;
280         }
281
282         result->private_data = (void *)crec;
283         crec->ctdb_ctx = ctx;
284
285         result->key.dsize = key.dsize;
286         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
287         if (result->key.dptr == NULL) {
288                 DEBUG(0, ("talloc failed\n"));
289                 TALLOC_FREE(result);
290                 return NULL;
291         }
292
293         /*
294          * Do a blocking lock on the record
295          */
296 again:
297
298         if (DEBUGLEVEL >= 10) {
299                 char *keystr = hex_encode(result, key.dptr, key.dsize);
300                 DEBUG(10, (DEBUGLEVEL > 10
301                            ? "Locking db %u key %s\n"
302                            : "Locking db %u key %.20s\n",
303                            (int)crec->ctdb_ctx->db_id, keystr));
304                 TALLOC_FREE(keystr);
305         }
306         
307         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
308                 DEBUG(3, ("tdb_chainlock failed\n"));
309                 TALLOC_FREE(result);
310                 return NULL;
311         }
312
313         if (persistent) {
314                 result->store = db_ctdb_store_persistent;
315                 result->delete_rec = db_ctdb_delete_persistent;
316         } else {
317                 result->store = db_ctdb_store;
318                 result->delete_rec = db_ctdb_delete;
319         }
320         talloc_set_destructor(result, db_ctdb_record_destr);
321
322         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
323
324         /*
325          * See if we have a valid record and we are the dmaster. If so, we can
326          * take the shortcut and just return it.
327          */
328
329         if ((ctdb_data.dptr == NULL) ||
330             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
331             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
332 #if 0
333             || (random() % 2 != 0)
334 #endif
335 ) {
336                 SAFE_FREE(ctdb_data.dptr);
337                 tdb_chainunlock(ctx->wtdb->tdb, key);
338                 talloc_set_destructor(result, NULL);
339
340                 migrate_attempts += 1;
341
342                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
343                            ctdb_data.dptr, ctdb_data.dptr ?
344                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
345                            get_my_vnn()));
346
347                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
348                 if (!NT_STATUS_IS_OK(status)) {
349                         DEBUG(5, ("ctdb_migrate failed: %s\n",
350                                   nt_errstr(status)));
351                         TALLOC_FREE(result);
352                         return NULL;
353                 }
354                 /* now its migrated, try again */
355                 goto again;
356         }
357
358         if (migrate_attempts > 10) {
359                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
360                           migrate_attempts));
361         }
362
363         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
364
365         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
366         result->value.dptr = NULL;
367
368         if ((result->value.dsize != 0)
369             && !(result->value.dptr = (uint8 *)talloc_memdup(
370                          result, ctdb_data.dptr + sizeof(crec->header),
371                          result->value.dsize))) {
372                 DEBUG(0, ("talloc failed\n"));
373                 TALLOC_FREE(result);
374         }
375
376         SAFE_FREE(ctdb_data.dptr);
377
378         return result;
379 }
380
381 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
382                                               TALLOC_CTX *mem_ctx,
383                                               TDB_DATA key)
384 {
385         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
386                                                         struct db_ctdb_ctx);
387
388         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
389 }
390
391 /*
392   fetch (unlocked, no migration) operation on ctdb
393  */
394 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
395                          TDB_DATA key, TDB_DATA *data)
396 {
397         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
398                                                         struct db_ctdb_ctx);
399         NTSTATUS status;
400         TDB_DATA ctdb_data;
401
402         /* try a direct fetch */
403         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
404
405         /*
406          * See if we have a valid record and we are the dmaster. If so, we can
407          * take the shortcut and just return it.
408          * we bypass the dmaster check for persistent databases
409          */
410         if ((ctdb_data.dptr != NULL) &&
411             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
412             (db->persistent ||
413              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
414                 /* we are the dmaster - avoid the ctdb protocol op */
415
416                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
417                 if (data->dsize == 0) {
418                         SAFE_FREE(ctdb_data.dptr);
419                         data->dptr = NULL;
420                         return 0;
421                 }
422
423                 data->dptr = (uint8 *)talloc_memdup(
424                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
425                         data->dsize);
426
427                 SAFE_FREE(ctdb_data.dptr);
428
429                 if (data->dptr == NULL) {
430                         return -1;
431                 }
432                 return 0;
433         }
434
435         SAFE_FREE(ctdb_data.dptr);
436
437         /* we weren't able to get it locally - ask ctdb to fetch it for us */
438         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
439         if (!NT_STATUS_IS_OK(status)) {
440                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
441                 return -1;
442         }
443
444         return 0;
445 }
446
447 struct traverse_state {
448         struct db_context *db;
449         int (*fn)(struct db_record *rec, void *private_data);
450         void *private_data;
451 };
452
453 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
454 {
455         struct traverse_state *state = (struct traverse_state *)private_data;
456         struct db_record *rec;
457         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
458         /* we have to give them a locked record to prevent races */
459         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
460         if (rec && rec->value.dsize > 0) {
461                 state->fn(rec, state->private_data);
462         }
463         talloc_free(tmp_ctx);
464 }
465
466 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
467                                         void *private_data)
468 {
469         struct traverse_state *state = (struct traverse_state *)private_data;
470         struct db_record *rec;
471         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
472         int ret = 0;
473         /* we have to give them a locked record to prevent races */
474         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
475         if (rec && rec->value.dsize > 0) {
476                 ret = state->fn(rec, state->private_data);
477         }
478         talloc_free(tmp_ctx);
479         return ret;
480 }
481
482 static int db_ctdb_traverse(struct db_context *db,
483                             int (*fn)(struct db_record *rec,
484                                       void *private_data),
485                             void *private_data)
486 {
487         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
488                                                         struct db_ctdb_ctx);
489         struct traverse_state state;
490
491         state.db = db;
492         state.fn = fn;
493         state.private_data = private_data;
494
495         if (db->persistent) {
496                 /* for persistent databases we don't need to do a ctdb traverse,
497                    we can do a faster local traverse */
498                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
499         }
500
501
502         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
503         return 0;
504 }
505
506 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
507 {
508         return NT_STATUS_MEDIA_WRITE_PROTECTED;
509 }
510
511 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
512 {
513         return NT_STATUS_MEDIA_WRITE_PROTECTED;
514 }
515
516 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
517 {
518         struct traverse_state *state = (struct traverse_state *)private_data;
519         struct db_record rec;
520         rec.key = key;
521         rec.value = data;
522         rec.store = db_ctdb_store_deny;
523         rec.delete_rec = db_ctdb_delete_deny;
524         rec.private_data = state->db;
525         state->fn(&rec, state->private_data);
526 }
527
528 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
529                                         void *private_data)
530 {
531         struct traverse_state *state = (struct traverse_state *)private_data;
532         struct db_record rec;
533         rec.key = kbuf;
534         rec.value = dbuf;
535         rec.store = db_ctdb_store_deny;
536         rec.delete_rec = db_ctdb_delete_deny;
537         rec.private_data = state->db;
538
539         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
540                 /* a deleted record */
541                 return 0;
542         }
543         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
544         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
545
546         return state->fn(&rec, state->private_data);
547 }
548
549 static int db_ctdb_traverse_read(struct db_context *db,
550                                  int (*fn)(struct db_record *rec,
551                                            void *private_data),
552                                  void *private_data)
553 {
554         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
555                                                         struct db_ctdb_ctx);
556         struct traverse_state state;
557
558         state.db = db;
559         state.fn = fn;
560         state.private_data = private_data;
561
562         if (db->persistent) {
563                 /* for persistent databases we don't need to do a ctdb traverse,
564                    we can do a faster local traverse */
565                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
566         }
567
568         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
569         return 0;
570 }
571
572 static int db_ctdb_get_seqnum(struct db_context *db)
573 {
574         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
575                                                         struct db_ctdb_ctx);
576         return tdb_get_seqnum(ctx->wtdb->tdb);
577 }
578
579 static int db_ctdb_trans_dummy(struct db_context *db)
580 {
581         /*
582          * Not implemented yet, just return ok
583          */
584         return 0;
585 }
586
587 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
588                                 const char *name,
589                                 int hash_size, int tdb_flags,
590                                 int open_flags, mode_t mode)
591 {
592         struct db_context *result;
593         struct db_ctdb_ctx *db_ctdb;
594         char *db_path;
595
596         if (!lp_clustering()) {
597                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
598                 return NULL;
599         }
600
601         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
602                 DEBUG(0, ("talloc failed\n"));
603                 TALLOC_FREE(result);
604                 return NULL;
605         }
606
607         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
608                 DEBUG(0, ("talloc failed\n"));
609                 TALLOC_FREE(result);
610                 return NULL;
611         }
612
613         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
614                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
615                 TALLOC_FREE(result);
616                 return NULL;
617         }
618
619         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
620
621         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
622
623         /* only pass through specific flags */
624         tdb_flags &= TDB_SEQNUM;
625
626         /* honor permissions if user has specified O_CREAT */
627         if (open_flags & O_CREAT) {
628                 chmod(db_path, mode);
629         }
630
631         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
632         if (db_ctdb->wtdb == NULL) {
633                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
634                 TALLOC_FREE(result);
635                 return NULL;
636         }
637         talloc_free(db_path);
638
639         result->private_data = (void *)db_ctdb;
640         result->fetch_locked = db_ctdb_fetch_locked;
641         result->fetch = db_ctdb_fetch;
642         result->traverse = db_ctdb_traverse;
643         result->traverse_read = db_ctdb_traverse_read;
644         result->get_seqnum = db_ctdb_get_seqnum;
645         result->transaction_start = db_ctdb_trans_dummy;
646         result->transaction_commit = db_ctdb_trans_dummy;
647         result->transaction_cancel = db_ctdb_trans_dummy;
648
649         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
650                  name, db_ctdb->db_id));
651
652         return result;
653 }
654 #endif