Add ctdb_fork(0 which will fork a child process and drop the real-time
[sahlberg/ctdb.git] / server / ctdb_lockwait.c
1 /* 
2    wait for a tdb chain lock
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "db_wrap.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
27
28
29 struct lockwait_handle {
30         struct ctdb_context *ctdb;
31         struct ctdb_db_context *ctdb_db;
32         struct fd_event *fde;
33         int fd[2];
34         pid_t child;
35         void *private_data;
36         void (*callback)(void *);
37         TDB_DATA key;
38         struct timeval start_time;
39 };
40
41 static void lockwait_handler(struct event_context *ev, struct fd_event *fde, 
42                              uint16_t flags, void *private_data)
43 {
44         struct lockwait_handle *h = talloc_get_type(private_data, 
45                                                      struct lockwait_handle);
46         void (*callback)(void *) = h->callback;
47         void *p = h->private_data;
48         pid_t child = h->child;
49         TDB_DATA key = h->key;
50         struct tdb_context *tdb = h->ctdb_db->ltdb->tdb;
51         TALLOC_CTX *tmp_ctx = talloc_new(ev);
52
53         key.dptr = talloc_memdup(tmp_ctx, key.dptr, key.dsize);
54
55         talloc_set_destructor(h, NULL);
56         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "lockwait", lockwait_latency, h->start_time);
57         CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls);
58
59         /* the handle needs to go away when the context is gone - when
60            the handle goes away this implicitly closes the pipe, which
61            kills the child holding the lock */
62         talloc_steal(tmp_ctx, h);
63
64         if (h->ctdb->flags & CTDB_FLAG_TORTURE) {
65                 if (tdb_chainlock_nonblock(tdb, key) == 0) {
66                         ctdb_fatal(h->ctdb, "got chain lock while lockwait child active");
67                 }
68         }
69
70         tdb_chainlock_mark(tdb, key);
71         callback(p);
72         tdb_chainlock_unmark(tdb, key);
73
74         kill(child, SIGKILL);
75         talloc_free(tmp_ctx);
76 }
77
78 static int lockwait_destructor(struct lockwait_handle *h)
79 {
80         CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls);
81         kill(h->child, SIGKILL);
82         return 0;
83 }
84
85 /*
86   setup a non-blocking chainlock on a tdb record. If this function
87   returns NULL then it could not get the chainlock. Otherwise it
88   returns a opaque handle, and will call callback() once it has
89   managed to get the chainlock. You can cancel it by using talloc_free
90   on the returned handle.
91
92   It is the callers responsibility to unlock the chainlock once
93   acquired
94  */
95 struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
96                                       TDB_DATA key,
97                                       void (*callback)(void *private_data),
98                                       void *private_data)
99 {
100         struct lockwait_handle *result;
101         int ret;
102         pid_t parent = getpid();
103
104         CTDB_INCREMENT_STAT(ctdb_db->ctdb, lockwait_calls);
105         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
106
107         if (!(result = talloc_zero(private_data, struct lockwait_handle))) {
108                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
109                 return NULL;
110         }
111
112         ret = pipe(result->fd);
113
114         if (ret != 0) {
115                 talloc_free(result);
116                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
117                 return NULL;
118         }
119
120         result->child = ctdb_fork(ctdb_db->ctdb);
121
122         if (result->child == (pid_t)-1) {
123                 close(result->fd[0]);
124                 close(result->fd[1]);
125                 talloc_free(result);
126                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
127                 return NULL;
128         }
129
130         result->callback = callback;
131         result->private_data = private_data;
132         result->ctdb = ctdb_db->ctdb;
133         result->ctdb_db = ctdb_db;
134         result->key = key;
135
136         if (result->child == 0) {
137                 char c = 0;
138                 close(result->fd[0]);
139                 debug_extra = talloc_asprintf(NULL, "chainlock-%s:", ctdb_db->db_name);
140                 tdb_chainlock(ctdb_db->ltdb->tdb, key);
141                 write(result->fd[1], &c, 1);
142                 /* make sure we die when our parent dies */
143                 while (kill(parent, 0) == 0 || errno != ESRCH) {
144                         sleep(5);
145                 }
146                 _exit(0);
147         }
148
149         close(result->fd[1]);
150         set_close_on_exec(result->fd[0]);
151
152         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child lockwait process\n", result->fd[0]));
153
154         talloc_set_destructor(result, lockwait_destructor);
155
156         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
157                                    EVENT_FD_READ, lockwait_handler,
158                                    (void *)result);
159         if (result->fde == NULL) {
160                 talloc_free(result);
161                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
162                 return NULL;
163         }
164         tevent_fd_set_auto_close(result->fde);
165
166         result->start_time = timeval_current();
167
168         return result;
169 }