Merge branch 'master' of ctdb into 'master' of samba
[samba.git] / ctdb / common / ctdb_ltdb.c
1 /* 
2    ctdb ltdb code
3
4    Copyright (C) Andrew Tridgell  2006
5    Copyright (C) Ronnie sahlberg  2011
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "../include/ctdb_private.h"
26 #include "db_wrap.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30   find an attached ctdb_db handle given a name
31  */
32 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
33 {
34         struct ctdb_db_context *tmp_db;
35         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
36                 if (strcmp(name, tmp_db->db_name) == 0) {
37                         return tmp_db;
38                 }
39         }
40         return NULL;
41 }
42
43
44 /*
45   return the lmaster given a key
46 */
47 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
48 {
49         uint32_t idx, lmaster;
50
51         idx = ctdb_hash(key) % ctdb->vnn_map->size;
52         lmaster = ctdb->vnn_map->map[idx];
53
54         return lmaster;
55 }
56
57
58 /*
59   construct an initial header for a record with no ltdb header yet
60 */
61 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, 
62                                 TDB_DATA key,
63                                 struct ctdb_ltdb_header *header)
64 {
65         ZERO_STRUCTP(header);
66         /* initial dmaster is the lmaster */
67         header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
68         header->flags = CTDB_REC_FLAG_AUTOMATIC;
69 }
70
71
72 /*
73   fetch a record from the ltdb, separating out the header information
74   and returning the body of the record. A valid (initial) header is
75   returned if the record is not present
76 */
77 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, 
78                     TDB_DATA key, struct ctdb_ltdb_header *header, 
79                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
80 {
81         TDB_DATA rec;
82         struct ctdb_context *ctdb = ctdb_db->ctdb;
83
84         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
85         if (rec.dsize < sizeof(*header)) {
86                 TDB_DATA d2;
87                 /* return an initial header */
88                 if (rec.dptr) free(rec.dptr);
89                 if (ctdb->vnn_map == NULL) {
90                         /* called from the client */
91                         ZERO_STRUCTP(data);
92                         header->dmaster = (uint32_t)-1;
93                         return -1;
94                 }
95                 ltdb_initial_header(ctdb_db, key, header);
96                 ZERO_STRUCT(d2);
97                 if (data) {
98                         *data = d2;
99                 }
100                 if (ctdb_db->persistent || header->dmaster == ctdb_db->ctdb->pnn) {
101                         ctdb_ltdb_store(ctdb_db, key, header, d2);
102                 }
103                 return 0;
104         }
105
106         *header = *(struct ctdb_ltdb_header *)rec.dptr;
107
108         if (data) {
109                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110                 data->dptr = talloc_memdup(mem_ctx, 
111                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
112                                            data->dsize);
113         }
114
115         free(rec.dptr);
116         if (data) {
117                 CTDB_NO_MEMORY(ctdb, data->dptr);
118         }
119
120         return 0;
121 }
122
123 /*
124   fetch a record from the ltdb, separating out the header information
125   and returning the body of the record.
126   if the record does not exist, *header will be NULL
127   and data = {0, NULL}
128 */
129 int ctdb_ltdb_fetch_with_header(struct ctdb_db_context *ctdb_db, 
130                     TDB_DATA key, struct ctdb_ltdb_header *header, 
131                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
132 {
133         TDB_DATA rec;
134
135         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
136         if (rec.dsize < sizeof(*header)) {
137                 free(rec.dptr);
138
139                 data->dsize = 0;
140                 data->dptr = NULL;
141                 return -1;
142         }
143
144         *header = *(struct ctdb_ltdb_header *)rec.dptr;
145         if (data) {
146                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
147                 data->dptr = talloc_memdup(mem_ctx, 
148                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
149                                            data->dsize);
150         }
151
152         free(rec.dptr);
153
154         return 0;
155 }
156
157
158 /*
159   write a record to a normal database
160 */
161 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
162                     struct ctdb_ltdb_header *header, TDB_DATA data)
163 {
164         struct ctdb_context *ctdb = ctdb_db->ctdb;
165         TDB_DATA rec;
166         int ret;
167         bool seqnum_suppressed = false;
168
169         if (ctdb_db->ctdb_ltdb_store_fn) {
170                 return ctdb_db->ctdb_ltdb_store_fn(ctdb_db, key, header, data);
171         }
172
173         if (ctdb->flags & CTDB_FLAG_TORTURE) {
174                 struct ctdb_ltdb_header *h2;
175                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
176                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
177                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
178                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
179                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
180                 }
181                 if (rec.dptr) free(rec.dptr);
182         }
183
184         rec.dsize = sizeof(*header) + data.dsize;
185         rec.dptr = talloc_size(ctdb, rec.dsize);
186         CTDB_NO_MEMORY(ctdb, rec.dptr);
187
188         memcpy(rec.dptr, header, sizeof(*header));
189         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
190
191         /* Databases with seqnum updates enabled only get their seqnum
192            changes when/if we modify the data */
193         if (ctdb_db->seqnum_update != NULL) {
194                 TDB_DATA old;
195                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
196
197                 if ( (old.dsize == rec.dsize)
198                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
199                           rec.dptr+sizeof(struct ctdb_ltdb_header),
200                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
201                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
202                         seqnum_suppressed = true;
203                 }
204                 if (old.dptr) free(old.dptr);
205         }
206         ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
207         if (ret != 0) {
208                 DEBUG(DEBUG_ERR, (__location__ " Failed to store dynamic data\n"));
209         }
210         if (seqnum_suppressed) {
211                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
212         }
213
214         talloc_free(rec.dptr);
215
216         return ret;
217 }
218
219 /*
220   lock a record in the ltdb, given a key
221  */
222 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
223 {
224         return tdb_chainlock(ctdb_db->ltdb->tdb, key);
225 }
226
227 /*
228   unlock a record in the ltdb, given a key
229  */
230 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
231 {
232         int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
233         if (ret != 0) {
234                 DEBUG(DEBUG_ERR,("tdb_chainunlock failed on db %s [%s]\n", ctdb_db->db_name, tdb_errorstr(ctdb_db->ltdb->tdb)));
235         }
236         return ret;
237 }
238
239
240 /*
241   delete a record from a normal database
242 */
243 int ctdb_ltdb_delete(struct ctdb_db_context *ctdb_db, TDB_DATA key)
244 {
245         if (ctdb_db->persistent != 0) {
246                 DEBUG(DEBUG_ERR,("Trying to delete emty record in persistent database\n"));
247                 return 0;
248         }
249         if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
250                 DEBUG(DEBUG_ERR,("Failed to delete empty record."));
251                 return -1;
252         }
253         return 0;
254 }
255
256 int ctdb_trackingdb_add_pnn(struct ctdb_context *ctdb, TDB_DATA *data, uint32_t pnn)
257 {
258         int byte_pos = pnn / 8;
259         int bit_mask   = 1 << (pnn % 8);
260
261         if (byte_pos + 1 > data->dsize) {
262                 char *buf;
263
264                 buf = malloc(byte_pos + 1);
265                 memset(buf, 0, byte_pos + 1);
266                 if (buf == NULL) {
267                         DEBUG(DEBUG_ERR, ("Out of memory when allocating buffer of %d bytes for trackingdb\n", byte_pos + 1));
268                         return -1;
269                 }
270                 if (data->dptr != NULL) {
271                         memcpy(buf, data->dptr, data->dsize);
272                         free(data->dptr);
273                 }
274                 data->dptr  = (uint8_t *)buf;
275                 data->dsize = byte_pos + 1;
276         }
277
278         data->dptr[byte_pos] |= bit_mask;
279         return 0;
280 }
281
282 void ctdb_trackingdb_traverse(struct ctdb_context *ctdb, TDB_DATA data, ctdb_trackingdb_cb cb, void *private_data)
283 {
284         int i;
285
286         for(i = 0; i < data.dsize; i++) {
287                 int j;
288
289                 for (j=0; j<8; j++) {
290                         int mask = 1<<j;
291
292                         if (data.dptr[i] & mask) {
293                                 cb(ctdb, i * 8 + j, private_data);
294                         }
295                 }
296         }
297 }
298
299 /*
300   this is the dummy null procedure that all databases support
301 */
302 int ctdb_null_func(struct ctdb_call_info *call)
303 {
304         return 0;
305 }
306
307 /*
308   this is a plain fetch procedure that all databases support
309 */
310 int ctdb_fetch_func(struct ctdb_call_info *call)
311 {
312         call->reply_data = &call->record_data;
313         return 0;
314 }
315
316 /*
317   this is a plain fetch procedure that all databases support
318   this returns the full record including the ltdb header
319 */
320 int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
321 {
322         call->reply_data = talloc(call, TDB_DATA);
323         if (call->reply_data == NULL) {
324                 return -1;
325         }
326         call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
327         call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
328         if (call->reply_data->dptr == NULL) {
329                 return -1;
330         }
331         memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
332         memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
333
334         return 0;
335 }
336