2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
30 this is the dummy null procedure that all databases support
32 static int ctdb_null_func(struct ctdb_call_info *call)
38 this is a plain fetch procedure that all databases support
40 static int ctdb_fetch_func(struct ctdb_call_info *call)
42 call->reply_data = &call->record_data;
48 struct lock_fetch_state {
49 struct ctdb_context *ctdb;
50 void (*recv_pkt)(void *, struct ctdb_req_header *);
52 struct ctdb_req_header *hdr;
54 bool ignore_generation;
58 called when we should retry the operation
60 static void lock_fetch_callback(void *p)
62 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
63 if (!state->ignore_generation &&
64 state->generation != state->ctdb->vnn_map->generation) {
65 DEBUG(0,("Discarding previous generation lockwait packet\n"));
66 talloc_free(state->hdr);
69 state->recv_pkt(state->recv_context, state->hdr);
70 DEBUG(2,(__location__ " PACKET REQUEUED\n"));
75 do a non-blocking ltdb_lock, deferring this ctdb request until we
78 It does the following:
80 1) tries to get the chainlock. If it succeeds, then it returns 0
82 2) if it fails to get a chainlock immediately then it sets up a
83 non-blocking chainlock via ctdb_lockwait, and when it gets the
84 chainlock it re-submits this ctdb request to the main packet
87 This effectively queues all ctdb requests that cannot be
88 immediately satisfied until it can get the lock. This means that
89 the main ctdb daemon will not block waiting for a chainlock held by
92 There are 3 possible return values:
94 0: means that it got the lock immediately.
95 -1: means that it failed to get the lock, and won't retry
96 -2: means that it failed to get the lock immediately, but will retry
98 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
99 TDB_DATA key, struct ctdb_req_header *hdr,
100 void (*recv_pkt)(void *, struct ctdb_req_header *),
101 void *recv_context, bool ignore_generation)
104 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
105 struct lockwait_handle *h;
106 struct lock_fetch_state *state;
108 ret = tdb_chainlock_nonblock(tdb, key);
111 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
112 /* a hard failure - don't try again */
116 /* when torturing, ensure we test the contended path */
117 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
120 tdb_chainunlock(tdb, key);
123 /* first the non-contended path */
128 state = talloc(hdr, struct lock_fetch_state);
129 state->ctdb = ctdb_db->ctdb;
131 state->recv_pkt = recv_pkt;
132 state->recv_context = recv_context;
133 state->generation = ctdb_db->ctdb->vnn_map->generation;
134 state->ignore_generation = ignore_generation;
136 /* now the contended path */
137 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
139 tdb_chainunlock(tdb, key);
143 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
144 so it won't be freed yet */
145 talloc_steal(state, hdr);
146 talloc_steal(state, h);
148 /* now tell the caller than we will retry asynchronously */
153 a varient of ctdb_ltdb_lock_requeue that also fetches the record
155 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
156 TDB_DATA key, struct ctdb_ltdb_header *header,
157 struct ctdb_req_header *hdr, TDB_DATA *data,
158 void (*recv_pkt)(void *, struct ctdb_req_header *),
159 void *recv_context, bool ignore_generation)
163 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
164 recv_context, ignore_generation);
166 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
168 ctdb_ltdb_unlock(ctdb_db, key);
176 paraoid check to see if the db is empty
178 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
180 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
181 int count = tdb_traverse_read(tdb, NULL, NULL);
183 DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
185 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
190 a client has asked to attach a new database
192 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
195 const char *db_name = (const char *)indata.dptr;
196 struct ctdb_db_context *ctdb_db, *tmp_db;
197 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
200 /* If the node is inactive it is not part of the cluster
201 and we should not allow clients to attach to any
204 if (node->flags & NODE_FLAGS_INACTIVE) {
205 DEBUG(0,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
210 /* see if we already have this name */
211 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
212 if (strcmp(db_name, tmp_db->db_name) == 0) {
213 /* this is not an error */
214 outdata->dptr = (uint8_t *)&tmp_db->db_id;
215 outdata->dsize = sizeof(tmp_db->db_id);
220 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
221 CTDB_NO_MEMORY(ctdb, ctdb_db);
223 ctdb_db->ctdb = ctdb;
224 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
225 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
227 ctdb_db->db_id = ctdb_hash(&indata);
229 outdata->dptr = (uint8_t *)&ctdb_db->db_id;
230 outdata->dsize = sizeof(ctdb_db->db_id);
232 /* check for hash collisions */
233 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
234 if (tmp_db->db_id == ctdb_db->db_id) {
235 DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
236 tmp_db->db_id, db_name, tmp_db->db_name));
237 talloc_free(ctdb_db);
242 if (ctdb->db_directory == NULL) {
243 ctdb->db_directory = VARDIR "/ctdb";
246 /* make sure the db directory exists */
247 if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
248 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n",
249 ctdb->db_directory));
250 talloc_free(ctdb_db);
254 /* open the database */
255 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
259 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
260 ctdb->tunable.database_hash_size,
261 TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
262 if (ctdb_db->ltdb == NULL) {
263 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
264 talloc_free(ctdb_db);
268 ctdb_check_db_empty(ctdb_db);
270 DLIST_ADD(ctdb->db_list, ctdb_db);
273 all databases support the "null" function. we need this in
274 order to do forced migration of records
276 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
278 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
279 talloc_free(ctdb_db);
284 all databases support the "fetch" function. we need this
285 for efficient Samba3 ctdb fetch
287 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
289 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
290 talloc_free(ctdb_db);
294 /* tell all the other nodes about this database */
295 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
296 CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
299 DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
306 called when a broadcast seqnum update comes in
308 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
310 struct ctdb_db_context *ctdb_db;
311 if (srcnode == ctdb->pnn) {
312 /* don't update ourselves! */
316 ctdb_db = find_ctdb_db(ctdb, db_id);
318 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
322 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
323 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
328 timer to check for seqnum changes in a ltdb and propogate them
330 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
331 struct timeval t, void *p)
333 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
334 struct ctdb_context *ctdb = ctdb_db->ctdb;
335 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
336 if (new_seqnum != ctdb_db->seqnum) {
337 /* something has changed - propogate it */
339 data.dptr = (uint8_t *)&ctdb_db->db_id;
340 data.dsize = sizeof(uint32_t);
341 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
342 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
345 ctdb_db->seqnum = new_seqnum;
347 /* setup a new timer */
349 event_add_timed(ctdb->ev, ctdb_db,
350 timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
351 ctdb_ltdb_seqnum_check, ctdb_db);
355 enable seqnum handling on this db
357 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
359 struct ctdb_db_context *ctdb_db;
360 ctdb_db = find_ctdb_db(ctdb, db_id);
362 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
366 if (ctdb_db->te == NULL) {
368 event_add_timed(ctdb->ev, ctdb_db,
369 timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
370 ctdb_ltdb_seqnum_check, ctdb_db);
373 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
374 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);