4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
31 find an attached ctdb_db handle given a name
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
35 struct ctdb_db_context *tmp_db;
36 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37 if (strcmp(name, tmp_db->db_name) == 0) {
46 this is the dummy null procedure that all databases support
48 static int ctdb_null_func(struct ctdb_call_info *call)
55 return the lmaster given a key
57 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
59 uint32_t idx, lmaster;
61 idx = ctdb_hash(key) % ctdb->vnn_map->size;
62 lmaster = ctdb->vnn_map->map[idx];
69 construct an initial header for a record with no ltdb header yet
71 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db,
73 struct ctdb_ltdb_header *header)
76 /* initial dmaster is the lmaster */
77 header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
78 header->laccessor = header->dmaster;
84 fetch a record from the ltdb, separating out the header information
85 and returning the body of the record. A valid (initial) header is
86 returned if the record is not present
88 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
89 TDB_DATA key, struct ctdb_ltdb_header *header,
90 TALLOC_CTX *mem_ctx, TDB_DATA *data)
93 struct ctdb_context *ctdb = ctdb_db->ctdb;
95 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
96 if (rec.dsize < sizeof(*header)) {
98 /* return an initial header */
99 if (rec.dptr) free(rec.dptr);
100 ltdb_initial_header(ctdb_db, key, header);
105 ctdb_ltdb_store(ctdb_db, key, header, d2);
109 *header = *(struct ctdb_ltdb_header *)rec.dptr;
112 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
113 data->dptr = talloc_memdup(mem_ctx,
114 sizeof(struct ctdb_ltdb_header)+rec.dptr,
120 CTDB_NO_MEMORY(ctdb, data->dptr);
128 fetch a record from the ltdb, separating out the header information
129 and returning the body of the record. A valid (initial) header is
130 returned if the record is not present
132 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
133 struct ctdb_ltdb_header *header, TDB_DATA data)
135 struct ctdb_context *ctdb = ctdb_db->ctdb;
139 rec.dsize = sizeof(*header) + data.dsize;
140 rec.dptr = talloc_size(ctdb, rec.dsize);
141 CTDB_NO_MEMORY(ctdb, rec.dptr);
143 memcpy(rec.dptr, header, sizeof(*header));
144 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
146 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
147 talloc_free(rec.dptr);
154 lock a record in the ltdb, given a key
156 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
158 return tdb_chainlock(ctdb_db->ltdb->tdb, key);
162 unlock a record in the ltdb, given a key
164 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
166 int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
168 DEBUG(0,("tdb_chainunlock failed\n"));
173 struct lock_fetch_state {
174 struct ctdb_context *ctdb;
175 void (*recv_pkt)(void *, uint8_t *, uint32_t);
177 struct ctdb_req_header *hdr;
181 called when we should retry the operation
183 static void lock_fetch_callback(void *p)
185 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
186 state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
188 DEBUG(2,(__location__ " PACKET REQUEUED\n"));
193 do a non-blocking ltdb_lock, deferring this ctdb request until we
196 It does the following:
198 1) tries to get the chainlock. If it succeeds, then it returns 0
200 2) if it fails to get a chainlock immediately then it sets up a
201 non-blocking chainlock via ctdb_lockwait, and when it gets the
202 chainlock it re-submits this ctdb request to the main packet
205 This effectively queues all ctdb requests that cannot be
206 immediately satisfied until it can get the lock. This means that
207 the main ctdb daemon will not block waiting for a chainlock held by
210 There are 3 possible return values:
212 0: means that it got the lock immediately.
213 -1: means that it failed to get the lock, and won't retry
214 -2: means that it failed to get the lock immediately, but will retry
216 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
217 TDB_DATA key, struct ctdb_req_header *hdr,
218 void (*recv_pkt)(void *, uint8_t *, uint32_t ),
222 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
223 struct lockwait_handle *h;
224 struct lock_fetch_state *state;
226 ret = tdb_chainlock_nonblock(tdb, key);
229 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
230 /* a hard failure - don't try again */
234 /* when torturing, ensure we test the contended path */
235 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
238 tdb_chainunlock(tdb, key);
241 /* first the non-contended path */
246 state = talloc(ctdb_db, struct lock_fetch_state);
247 state->ctdb = ctdb_db->ctdb;
249 state->recv_pkt = recv_pkt;
250 state->recv_context = recv_context;
252 /* now the contended path */
253 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
255 tdb_chainunlock(tdb, key);
259 /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
260 so it won't be freed yet */
261 talloc_steal(state, hdr);
262 talloc_steal(state, h);
264 /* now tell the caller than we will retry asynchronously */
269 a varient of ctdb_ltdb_lock_requeue that also fetches the record
271 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
272 TDB_DATA key, struct ctdb_ltdb_header *header,
273 struct ctdb_req_header *hdr, TDB_DATA *data,
274 void (*recv_pkt)(void *, uint8_t *, uint32_t ),
279 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context);
281 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
283 ctdb_ltdb_unlock(ctdb_db, key);
291 a client has asked to attach a new database
293 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
296 const char *db_name = (const char *)indata.dptr;
297 struct ctdb_db_context *ctdb_db, *tmp_db;
300 /* see if we already have this name */
301 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
302 if (strcmp(db_name, tmp_db->db_name) == 0) {
303 /* this is not an error */
304 outdata->dptr = (uint8_t *)&tmp_db->db_id;
305 outdata->dsize = sizeof(tmp_db->db_id);
310 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
311 CTDB_NO_MEMORY(ctdb, ctdb_db);
313 ctdb_db->ctdb = ctdb;
314 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
315 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
317 ctdb_db->db_id = ctdb_hash(&indata);
319 outdata->dptr = (uint8_t *)&ctdb_db->db_id;
320 outdata->dsize = sizeof(ctdb_db->db_id);
322 /* check for hash collisions */
323 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
324 if (tmp_db->db_id == ctdb_db->db_id) {
325 DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
326 db_name, tmp_db->db_name));
327 talloc_free(ctdb_db);
332 if (ctdb->db_directory == NULL) {
333 ctdb->db_directory = VARDIR "/ctdb";
336 /* make sure the db directory exists */
337 if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
338 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n",
339 ctdb->db_directory));
340 talloc_free(ctdb_db);
344 /* open the database */
345 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
349 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0,
350 TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
351 if (ctdb_db->ltdb == NULL) {
352 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
353 talloc_free(ctdb_db);
357 DLIST_ADD(ctdb->db_list, ctdb_db);
360 all databases support the "null" function. we need this in
361 order to do forced migration of records
363 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
365 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
366 talloc_free(ctdb_db);
370 /* tell all the other nodes about this database */
371 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNN, 0,
372 CTDB_CONTROL_DB_ATTACH, CTDB_CTRL_FLAG_NOREPLY,
375 DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));