2 wait for a tdb chain lock
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "lib/util/dlinklist.h"
27 #include "../include/ctdb_private.h"
30 struct lockwait_handle {
31 struct lockwait_handle *next, *prev;
32 struct ctdb_context *ctdb;
33 struct ctdb_db_context *ctdb_db;
38 void (*callback)(void *);
40 struct timeval start_time;
43 /* If we managed to obtain a lock, find any overflow records which wanted the
44 * same one and do all the callbacks at once. */
45 static void do_overflow(struct ctdb_db_context *ctdb_db,
48 struct lockwait_handle *i, *next;
49 TALLOC_CTX *tmp_ctx = talloc_new(ctdb_db);
51 for (i = ctdb_db->lockwait_overflow; i; i = next) {
52 /* Careful: destructor removes it from list! */
54 if (key.dsize == i->key.dsize
55 && memcmp(key.dptr, i->key.dptr, key.dsize) == 0) {
56 /* Callback might free them, so reparent. */
57 talloc_steal(tmp_ctx, i);
58 i->callback(i->private_data);
62 /* This will free them if callback didn't. */
65 /* Remove one from the overflow queue if there is one. */
66 if (ctdb_db->lockwait_overflow) {
67 i = ctdb_db->lockwait_overflow;
68 ctdb_lockwait(ctdb_db, i->key, i->callback, i->private_data);
73 static int lockwait_destructor(struct lockwait_handle *h)
75 CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls);
76 kill(h->child, SIGKILL);
77 h->ctdb_db->pending_requests--;
78 DLIST_REMOVE(h->ctdb_db->lockwait_active, h);
82 static void lockwait_handler(struct event_context *ev, struct fd_event *fde,
83 uint16_t flags, void *private_data)
85 struct lockwait_handle *h = talloc_get_type(private_data,
86 struct lockwait_handle);
87 void (*callback)(void *) = h->callback;
88 void *p = h->private_data;
89 TDB_DATA key = h->key;
90 struct tdb_context *tdb = h->ctdb_db->ltdb->tdb;
91 TALLOC_CTX *tmp_ctx = talloc_new(ev);
93 key.dptr = talloc_memdup(tmp_ctx, key.dptr, key.dsize);
94 h->ctdb_db->pending_requests--;
96 CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "lockwait", lockwait_latency, h->start_time);
98 /* the handle needs to go away when the context is gone - when
99 the handle goes away this implicitly closes the pipe, which
100 kills the child holding the lock */
101 talloc_steal(tmp_ctx, h);
103 if (h->ctdb->flags & CTDB_FLAG_TORTURE) {
104 if (tdb_chainlock_nonblock(tdb, key) == 0) {
105 ctdb_fatal(h->ctdb, "got chain lock while lockwait child active");
109 tdb_chainlock_mark(tdb, key);
111 if (h->ctdb_db->lockwait_overflow) {
112 do_overflow(h->ctdb_db, key);
114 tdb_chainlock_unmark(tdb, key);
116 talloc_free(tmp_ctx);
120 static int overflow_lockwait_destructor(struct lockwait_handle *h)
122 CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls);
123 DLIST_REMOVE(h->ctdb_db->lockwait_overflow, h);
128 setup a non-blocking chainlock on a tdb record. If this function
129 returns NULL then it could not get the chainlock. Otherwise it
130 returns a opaque handle, and will call callback() once it has
131 managed to get the chainlock. You can cancel it by using talloc_free
132 on the returned handle.
134 It is the callers responsibility to unlock the chainlock once
137 struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
139 void (*callback)(void *private_data),
142 struct lockwait_handle *result, *i;
144 pid_t parent = getpid();
146 CTDB_INCREMENT_STAT(ctdb_db->ctdb, lockwait_calls);
147 CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
149 if (!(result = talloc_zero(private_data, struct lockwait_handle))) {
150 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
154 result->callback = callback;
155 result->private_data = private_data;
156 result->ctdb = ctdb_db->ctdb;
157 result->ctdb_db = ctdb_db;
160 /* If we already have a lockwait child for this request, then put this
161 request on the overflow queue straight away
163 for (i = ctdb_db->lockwait_active; i; i = i->next) {
164 if (key.dsize == i->key.dsize
165 && memcmp(key.dptr, i->key.dptr, key.dsize) == 0) {
166 DLIST_ADD_END(ctdb_db->lockwait_overflow, result, NULL);
167 talloc_set_destructor(result, overflow_lockwait_destructor);
172 /* Don't fire off too many children at once! */
173 if (ctdb_db->pending_requests > 200) {
174 DLIST_ADD_END(ctdb_db->lockwait_overflow, result, NULL);
175 talloc_set_destructor(result, overflow_lockwait_destructor);
176 DEBUG(DEBUG_DEBUG, (__location__ " Created overflow for %s\n",
181 ret = pipe(result->fd);
185 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
189 result->child = ctdb_fork(ctdb_db->ctdb);
191 if (result->child == (pid_t)-1) {
192 close(result->fd[0]);
193 close(result->fd[1]);
195 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
199 if (result->child == 0) {
201 close(result->fd[0]);
202 debug_extra = talloc_asprintf(NULL, "chainlock-%s:", ctdb_db->db_name);
203 tdb_chainlock(ctdb_db->ltdb->tdb, key);
204 write(result->fd[1], &c, 1);
205 /* make sure we die when our parent dies */
206 while (kill(parent, 0) == 0 || errno != ESRCH) {
212 close(result->fd[1]);
213 set_close_on_exec(result->fd[0]);
215 /* This is an active lockwait child process */
216 DLIST_ADD_END(ctdb_db->lockwait_active, result, NULL);
218 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child lockwait process\n", result->fd[0]));
220 ctdb_db->pending_requests++;
221 talloc_set_destructor(result, lockwait_destructor);
223 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
224 EVENT_FD_READ, lockwait_handler,
226 if (result->fde == NULL) {
228 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
231 tevent_fd_set_auto_close(result->fde);
233 result->start_time = timeval_current();