dbwrap ctdb: don't retry when tdb_store failed in db_ctdb_persistent_store().
[samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_ctx {
27         struct tdb_wrap *wtdb;
28         uint32 db_id;
29 };
30
31 struct db_ctdb_rec {
32         struct db_ctdb_ctx *ctdb_ctx;
33         struct ctdb_ltdb_header header;
34 };
35
36 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
37                                                TALLOC_CTX *mem_ctx,
38                                                TDB_DATA key,
39                                                bool persistent);
40
41 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
42 {
43         NTSTATUS status;
44         enum TDB_ERROR tret = tdb_error(tdb);
45
46         switch (tret) {
47         case TDB_ERR_EXISTS:
48                 status = NT_STATUS_OBJECT_NAME_COLLISION;
49                 break;
50         case TDB_ERR_NOEXIST:
51                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
52                 break;
53         default:
54                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
55                 break;
56         }
57
58         return status;
59 }
60
61 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
62 {
63         struct db_ctdb_rec *crec = talloc_get_type_abort(
64                 rec->private_data, struct db_ctdb_rec);
65         TDB_DATA cdata;
66         int ret;
67
68         cdata.dsize = sizeof(crec->header) + data.dsize;
69
70         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
71                 return NT_STATUS_NO_MEMORY;
72         }
73
74         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
75         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
76
77         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
78
79         SAFE_FREE(cdata.dptr);
80
81         return (ret == 0) ? NT_STATUS_OK
82                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
83 }
84
85
86 /* for persistent databases the store is a bit different. We have to
87    ask the ctdb daemon to push the record to all nodes after the
88    store */
89 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
90 {
91         struct db_ctdb_rec *crec;
92         struct db_record *record;
93         TDB_DATA cdata;
94         int ret;
95         NTSTATUS status;
96         uint32_t count;
97         int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
98
99         for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
100              (count < max_retries) && !NT_STATUS_IS_OK(status);
101              count++)
102         {
103                 if (count > 0) {
104                         /* retry */
105                         /*
106                          * There is a hack here: We use rec as a memory
107                          * context and re-use it as the record struct ptr.
108                          * We don't free the record data allocated
109                          * in each turn. So all gets freed when the caller
110                          * releases the original record. This is because
111                          * we don't get the record passed in by reference
112                          * in the first place and the caller relies on
113                          * having to free the record himself.
114                          */
115                         record = fetch_locked_internal(crec->ctdb_ctx,
116                                                        rec,
117                                                        rec->key,
118                                                        true /* persistent */);
119                         if (record == NULL) {
120                                 DEBUG(5, ("fetch_locked_internal failed.\n"));
121                                 status = NT_STATUS_NO_MEMORY;
122                                 break;
123                         }
124                 }
125
126                 crec = talloc_get_type_abort(record->private_data,
127                                              struct db_ctdb_rec);
128
129                 cdata.dsize = sizeof(crec->header) + data.dsize;
130
131                 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
132                         return NT_STATUS_NO_MEMORY;
133                 }
134
135                 crec->header.rsn++;
136
137                 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
138                 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
139
140                 status = ctdbd_start_persistent_update(
141                                 messaging_ctdbd_connection(),
142                                 crec->ctdb_ctx->db_id,
143                                 rec->key,
144                                 cdata);
145
146                 if (NT_STATUS_IS_OK(status)) {
147                         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
148                                         cdata, TDB_REPLACE);
149                         status = (ret == 0)
150                                         ? NT_STATUS_OK
151                                         : tdb_error_to_ntstatus(
152                                                 crec->ctdb_ctx->wtdb->tdb);
153                 }
154
155                 /*
156                  * release the lock *now* in order to prevent deadlocks.
157                  *
158                  * There is a tradeoff: Usually, the record is still locked
159                  * after db->store operation. This lock is usually released
160                  * via the talloc destructor with the TALLOC_FREE to
161                  * the record. So we have two choices:
162                  *
163                  * - Either re-lock the record after the call to persistent_store
164                  *   or cancel_persistent update and this way not changing any
165                  *   assumptions callers may have about the state, but possibly
166                  *   introducing new race conditions.
167                  *
168                  * - Or don't lock the record again but just remove the
169                  *   talloc_destructor. This is less racy but assumes that
170                  *   the lock is always released via TALLOC_FREE of the record.
171                  *
172                  * I choose the first variant for now since it seems less racy.
173                  * We can't guarantee that we succeed in getting the lock
174                  * anyways. The only real danger here is that a caller
175                  * performs multiple store operations after a fetch_locked()
176                  * which is currently not the case.
177                  */
178                 tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
179                 talloc_set_destructor(record, NULL);
180
181                 /* now tell ctdbd to update this record on all other nodes */
182                 if (NT_STATUS_IS_OK(status)) {
183                         status = ctdbd_persistent_store(
184                                         messaging_ctdbd_connection(),
185                                         crec->ctdb_ctx->db_id,
186                                         rec->key,
187                                         cdata);
188                 } else {
189                         ctdbd_cancel_persistent_update(
190                                         messaging_ctdbd_connection(),
191                                         crec->ctdb_ctx->db_id,
192                                         rec->key,
193                                         cdata);
194                         break;
195                 }
196
197                 SAFE_FREE(cdata.dptr);
198         } /* retry-loop */
199
200         if (!NT_STATUS_IS_OK(status)) {
201                 DEBUG(5, ("ctdbd_persistent_store still failed after "
202                           "%d retries with error %s - giving up.\n",
203                           count, nt_errstr(status)));
204         }
205
206         SAFE_FREE(cdata.dptr);
207
208         return status;
209 }
210
211 static NTSTATUS db_ctdb_delete(struct db_record *rec)
212 {
213         TDB_DATA data;
214
215         /*
216          * We have to store the header with empty data. TODO: Fix the
217          * tdb-level cleanup
218          */
219
220         ZERO_STRUCT(data);
221
222         return db_ctdb_store(rec, data, 0);
223
224 }
225
226 static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
227 {
228         TDB_DATA data;
229
230         /*
231          * We have to store the header with empty data. TODO: Fix the
232          * tdb-level cleanup
233          */
234
235         ZERO_STRUCT(data);
236
237         return db_ctdb_store_persistent(rec, data, 0);
238
239 }
240
241 static int db_ctdb_record_destr(struct db_record* data)
242 {
243         struct db_ctdb_rec *crec = talloc_get_type_abort(
244                 data->private_data, struct db_ctdb_rec);
245
246         DEBUG(10, (DEBUGLEVEL > 10
247                    ? "Unlocking db %u key %s\n"
248                    : "Unlocking db %u key %.20s\n",
249                    (int)crec->ctdb_ctx->db_id,
250                    hex_encode(data, (unsigned char *)data->key.dptr,
251                               data->key.dsize)));
252
253         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
254                 DEBUG(0, ("tdb_chainunlock failed\n"));
255                 return -1;
256         }
257
258         return 0;
259 }
260
261 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
262                                                TALLOC_CTX *mem_ctx,
263                                                TDB_DATA key,
264                                                bool persistent)
265 {
266         struct db_record *result;
267         struct db_ctdb_rec *crec;
268         NTSTATUS status;
269         TDB_DATA ctdb_data;
270         int migrate_attempts = 0;
271
272         if (!(result = talloc(mem_ctx, struct db_record))) {
273                 DEBUG(0, ("talloc failed\n"));
274                 return NULL;
275         }
276
277         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
278                 DEBUG(0, ("talloc failed\n"));
279                 TALLOC_FREE(result);
280                 return NULL;
281         }
282
283         result->private_data = (void *)crec;
284         crec->ctdb_ctx = ctx;
285
286         result->key.dsize = key.dsize;
287         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
288         if (result->key.dptr == NULL) {
289                 DEBUG(0, ("talloc failed\n"));
290                 TALLOC_FREE(result);
291                 return NULL;
292         }
293
294         /*
295          * Do a blocking lock on the record
296          */
297 again:
298
299         if (DEBUGLEVEL >= 10) {
300                 char *keystr = hex_encode(result, key.dptr, key.dsize);
301                 DEBUG(10, (DEBUGLEVEL > 10
302                            ? "Locking db %u key %s\n"
303                            : "Locking db %u key %.20s\n",
304                            (int)crec->ctdb_ctx->db_id, keystr));
305                 TALLOC_FREE(keystr);
306         }
307         
308         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
309                 DEBUG(3, ("tdb_chainlock failed\n"));
310                 TALLOC_FREE(result);
311                 return NULL;
312         }
313
314         if (persistent) {
315                 result->store = db_ctdb_store_persistent;
316                 result->delete_rec = db_ctdb_delete_persistent;
317         } else {
318                 result->store = db_ctdb_store;
319                 result->delete_rec = db_ctdb_delete;
320         }
321         talloc_set_destructor(result, db_ctdb_record_destr);
322
323         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
324
325         /*
326          * See if we have a valid record and we are the dmaster. If so, we can
327          * take the shortcut and just return it.
328          */
329
330         if ((ctdb_data.dptr == NULL) ||
331             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
332             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
333 #if 0
334             || (random() % 2 != 0)
335 #endif
336 ) {
337                 SAFE_FREE(ctdb_data.dptr);
338                 tdb_chainunlock(ctx->wtdb->tdb, key);
339                 talloc_set_destructor(result, NULL);
340
341                 migrate_attempts += 1;
342
343                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
344                            ctdb_data.dptr, ctdb_data.dptr ?
345                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
346                            get_my_vnn()));
347
348                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
349                 if (!NT_STATUS_IS_OK(status)) {
350                         DEBUG(5, ("ctdb_migrate failed: %s\n",
351                                   nt_errstr(status)));
352                         TALLOC_FREE(result);
353                         return NULL;
354                 }
355                 /* now its migrated, try again */
356                 goto again;
357         }
358
359         if (migrate_attempts > 10) {
360                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
361                           migrate_attempts));
362         }
363
364         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
365
366         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
367         result->value.dptr = NULL;
368
369         if ((result->value.dsize != 0)
370             && !(result->value.dptr = (uint8 *)talloc_memdup(
371                          result, ctdb_data.dptr + sizeof(crec->header),
372                          result->value.dsize))) {
373                 DEBUG(0, ("talloc failed\n"));
374                 TALLOC_FREE(result);
375         }
376
377         SAFE_FREE(ctdb_data.dptr);
378
379         return result;
380 }
381
382 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
383                                               TALLOC_CTX *mem_ctx,
384                                               TDB_DATA key)
385 {
386         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
387                                                         struct db_ctdb_ctx);
388
389         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
390 }
391
392 /*
393   fetch (unlocked, no migration) operation on ctdb
394  */
395 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
396                          TDB_DATA key, TDB_DATA *data)
397 {
398         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
399                                                         struct db_ctdb_ctx);
400         NTSTATUS status;
401         TDB_DATA ctdb_data;
402
403         /* try a direct fetch */
404         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
405
406         /*
407          * See if we have a valid record and we are the dmaster. If so, we can
408          * take the shortcut and just return it.
409          * we bypass the dmaster check for persistent databases
410          */
411         if ((ctdb_data.dptr != NULL) &&
412             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
413             (db->persistent ||
414              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
415                 /* we are the dmaster - avoid the ctdb protocol op */
416
417                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
418                 if (data->dsize == 0) {
419                         SAFE_FREE(ctdb_data.dptr);
420                         data->dptr = NULL;
421                         return 0;
422                 }
423
424                 data->dptr = (uint8 *)talloc_memdup(
425                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
426                         data->dsize);
427
428                 SAFE_FREE(ctdb_data.dptr);
429
430                 if (data->dptr == NULL) {
431                         return -1;
432                 }
433                 return 0;
434         }
435
436         SAFE_FREE(ctdb_data.dptr);
437
438         /* we weren't able to get it locally - ask ctdb to fetch it for us */
439         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
440         if (!NT_STATUS_IS_OK(status)) {
441                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
442                 return -1;
443         }
444
445         return 0;
446 }
447
448 struct traverse_state {
449         struct db_context *db;
450         int (*fn)(struct db_record *rec, void *private_data);
451         void *private_data;
452 };
453
454 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
455 {
456         struct traverse_state *state = (struct traverse_state *)private_data;
457         struct db_record *rec;
458         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
459         /* we have to give them a locked record to prevent races */
460         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
461         if (rec && rec->value.dsize > 0) {
462                 state->fn(rec, state->private_data);
463         }
464         talloc_free(tmp_ctx);
465 }
466
467 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
468                                         void *private_data)
469 {
470         struct traverse_state *state = (struct traverse_state *)private_data;
471         struct db_record *rec;
472         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
473         int ret = 0;
474         /* we have to give them a locked record to prevent races */
475         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
476         if (rec && rec->value.dsize > 0) {
477                 ret = state->fn(rec, state->private_data);
478         }
479         talloc_free(tmp_ctx);
480         return ret;
481 }
482
483 static int db_ctdb_traverse(struct db_context *db,
484                             int (*fn)(struct db_record *rec,
485                                       void *private_data),
486                             void *private_data)
487 {
488         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
489                                                         struct db_ctdb_ctx);
490         struct traverse_state state;
491
492         state.db = db;
493         state.fn = fn;
494         state.private_data = private_data;
495
496         if (db->persistent) {
497                 /* for persistent databases we don't need to do a ctdb traverse,
498                    we can do a faster local traverse */
499                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
500         }
501
502
503         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
504         return 0;
505 }
506
507 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
508 {
509         return NT_STATUS_MEDIA_WRITE_PROTECTED;
510 }
511
512 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
513 {
514         return NT_STATUS_MEDIA_WRITE_PROTECTED;
515 }
516
517 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
518 {
519         struct traverse_state *state = (struct traverse_state *)private_data;
520         struct db_record rec;
521         rec.key = key;
522         rec.value = data;
523         rec.store = db_ctdb_store_deny;
524         rec.delete_rec = db_ctdb_delete_deny;
525         rec.private_data = state->db;
526         state->fn(&rec, state->private_data);
527 }
528
529 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
530                                         void *private_data)
531 {
532         struct traverse_state *state = (struct traverse_state *)private_data;
533         struct db_record rec;
534         rec.key = kbuf;
535         rec.value = dbuf;
536         rec.store = db_ctdb_store_deny;
537         rec.delete_rec = db_ctdb_delete_deny;
538         rec.private_data = state->db;
539
540         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
541                 /* a deleted record */
542                 return 0;
543         }
544         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
545         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
546
547         return state->fn(&rec, state->private_data);
548 }
549
550 static int db_ctdb_traverse_read(struct db_context *db,
551                                  int (*fn)(struct db_record *rec,
552                                            void *private_data),
553                                  void *private_data)
554 {
555         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
556                                                         struct db_ctdb_ctx);
557         struct traverse_state state;
558
559         state.db = db;
560         state.fn = fn;
561         state.private_data = private_data;
562
563         if (db->persistent) {
564                 /* for persistent databases we don't need to do a ctdb traverse,
565                    we can do a faster local traverse */
566                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
567         }
568
569         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
570         return 0;
571 }
572
573 static int db_ctdb_get_seqnum(struct db_context *db)
574 {
575         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
576                                                         struct db_ctdb_ctx);
577         return tdb_get_seqnum(ctx->wtdb->tdb);
578 }
579
580 static int db_ctdb_trans_dummy(struct db_context *db)
581 {
582         /*
583          * Not implemented yet, just return ok
584          */
585         return 0;
586 }
587
588 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
589                                 const char *name,
590                                 int hash_size, int tdb_flags,
591                                 int open_flags, mode_t mode)
592 {
593         struct db_context *result;
594         struct db_ctdb_ctx *db_ctdb;
595         char *db_path;
596
597         if (!lp_clustering()) {
598                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
599                 return NULL;
600         }
601
602         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
603                 DEBUG(0, ("talloc failed\n"));
604                 TALLOC_FREE(result);
605                 return NULL;
606         }
607
608         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
609                 DEBUG(0, ("talloc failed\n"));
610                 TALLOC_FREE(result);
611                 return NULL;
612         }
613
614         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
615                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
616                 TALLOC_FREE(result);
617                 return NULL;
618         }
619
620         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
621
622         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
623
624         /* only pass through specific flags */
625         tdb_flags &= TDB_SEQNUM;
626
627         /* honor permissions if user has specified O_CREAT */
628         if (open_flags & O_CREAT) {
629                 chmod(db_path, mode);
630         }
631
632         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
633         if (db_ctdb->wtdb == NULL) {
634                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
635                 TALLOC_FREE(result);
636                 return NULL;
637         }
638         talloc_free(db_path);
639
640         result->private_data = (void *)db_ctdb;
641         result->fetch_locked = db_ctdb_fetch_locked;
642         result->fetch = db_ctdb_fetch;
643         result->traverse = db_ctdb_traverse;
644         result->traverse_read = db_ctdb_traverse_read;
645         result->get_seqnum = db_ctdb_get_seqnum;
646         result->transaction_start = db_ctdb_trans_dummy;
647         result->transaction_commit = db_ctdb_trans_dummy;
648         result->transaction_cancel = db_ctdb_trans_dummy;
649
650         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
651                  name, db_ctdb->db_id));
652
653         return result;
654 }
655 #endif