253a8673518da6f7f53a430c02dcf748a7eaeaf0
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "pthreadpool_tevent.h"
22 #include "pthreadpool.h"
23 #include "lib/util/tevent_unix.h"
24 #include "lib/util/dlinklist.h"
25
26 struct pthreadpool_tevent_job_state;
27
28 struct pthreadpool_tevent {
29         struct pthreadpool *pool;
30
31         struct pthreadpool_tevent_job_state *jobs;
32 };
33
34 struct pthreadpool_tevent_job_state {
35         struct pthreadpool_tevent_job_state *prev, *next;
36         struct pthreadpool_tevent *pool;
37         struct tevent_context *ev;
38         struct tevent_threaded_context *tctx;
39         struct tevent_immediate *im;
40         struct tevent_req *req;
41
42         void (*fn)(void *private_data);
43         void *private_data;
44 };
45
46 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
47
48 static int pthreadpool_tevent_job_signal(int jobid,
49                                          void (*job_fn)(void *private_data),
50                                          void *job_private_data,
51                                          void *private_data);
52
53 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
54                             struct pthreadpool_tevent **presult)
55 {
56         struct pthreadpool_tevent *pool;
57         int ret;
58
59         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
60         if (pool == NULL) {
61                 return ENOMEM;
62         }
63
64         ret = pthreadpool_init(max_threads, &pool->pool,
65                                pthreadpool_tevent_job_signal, pool);
66         if (ret != 0) {
67                 TALLOC_FREE(pool);
68                 return ret;
69         }
70
71         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
72
73         *presult = pool;
74         return 0;
75 }
76
77 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
78 {
79         struct pthreadpool_tevent_job_state *state, *next;
80         int ret;
81
82         ret = pthreadpool_destroy(pool->pool);
83         if (ret != 0) {
84                 return ret;
85         }
86         pool->pool = NULL;
87
88         for (state = pool->jobs; state != NULL; state = next) {
89                 next = state->next;
90                 DLIST_REMOVE(pool->jobs, state);
91                 state->pool = NULL;
92         }
93
94         return 0;
95 }
96
97 static void pthreadpool_tevent_job_fn(void *private_data);
98 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
99                                         struct tevent_immediate *im,
100                                         void *private_data);
101
102 static int pthreadpool_tevent_job_state_destructor(
103         struct pthreadpool_tevent_job_state *state)
104 {
105         if (state->pool == NULL) {
106                 return 0;
107         }
108
109         /*
110          * We should never be called with state->req == NULL,
111          * state->pool must be cleared before the 2nd talloc_free().
112          */
113         if (state->req == NULL) {
114                 abort();
115         }
116
117         /*
118          * We need to reparent to a long term context.
119          */
120         (void)talloc_reparent(state->req, NULL, state);
121         state->req = NULL;
122         return -1;
123 }
124
125 struct tevent_req *pthreadpool_tevent_job_send(
126         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
127         struct pthreadpool_tevent *pool,
128         void (*fn)(void *private_data), void *private_data)
129 {
130         struct tevent_req *req;
131         struct pthreadpool_tevent_job_state *state;
132         int ret;
133
134         req = tevent_req_create(mem_ctx, &state,
135                                 struct pthreadpool_tevent_job_state);
136         if (req == NULL) {
137                 return NULL;
138         }
139         state->pool = pool;
140         state->ev = ev;
141         state->req = req;
142         state->fn = fn;
143         state->private_data = private_data;
144
145         state->im = tevent_create_immediate(state);
146         if (tevent_req_nomem(state->im, req)) {
147                 return tevent_req_post(req, ev);
148         }
149
150 #ifdef HAVE_PTHREAD
151         state->tctx = tevent_threaded_context_create(state, ev);
152         if (state->tctx == NULL && errno == ENOSYS) {
153                 /*
154                  * Samba build with pthread support but
155                  * tevent without???
156                  */
157                 tevent_req_error(req, ENOSYS);
158                 return tevent_req_post(req, ev);
159         }
160         if (tevent_req_nomem(state->tctx, req)) {
161                 return tevent_req_post(req, ev);
162         }
163 #endif
164
165         ret = pthreadpool_add_job(pool->pool, 0,
166                                   pthreadpool_tevent_job_fn,
167                                   state);
168         if (tevent_req_error(req, ret)) {
169                 return tevent_req_post(req, ev);
170         }
171
172         /*
173          * Once the job is scheduled, we need to protect
174          * our memory.
175          */
176         talloc_set_destructor(state, pthreadpool_tevent_job_state_destructor);
177
178         DLIST_ADD_END(pool->jobs, state);
179
180         return req;
181 }
182
183 static void pthreadpool_tevent_job_fn(void *private_data)
184 {
185         struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
186                 private_data, struct pthreadpool_tevent_job_state);
187         state->fn(state->private_data);
188 }
189
190 static int pthreadpool_tevent_job_signal(int jobid,
191                                          void (*job_fn)(void *private_data),
192                                          void *job_private_data,
193                                          void *private_data)
194 {
195         struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
196                 job_private_data, struct pthreadpool_tevent_job_state);
197
198         if (state->tctx != NULL) {
199                 /* with HAVE_PTHREAD */
200                 tevent_threaded_schedule_immediate(state->tctx, state->im,
201                                                    pthreadpool_tevent_job_done,
202                                                    state);
203         } else {
204                 /* without HAVE_PTHREAD */
205                 tevent_schedule_immediate(state->im, state->ev,
206                                           pthreadpool_tevent_job_done,
207                                           state);
208         }
209
210         return 0;
211 }
212
213 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
214                                         struct tevent_immediate *im,
215                                         void *private_data)
216 {
217         struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
218                 private_data, struct pthreadpool_tevent_job_state);
219
220         if (state->pool != NULL) {
221                 DLIST_REMOVE(state->pool->jobs, state);
222                 state->pool = NULL;
223         }
224
225         TALLOC_FREE(state->tctx);
226
227         if (state->req == NULL) {
228                 /*
229                  * There was a talloc_free() state->req
230                  * while the job was pending,
231                  * which mean we're reparented on a longterm
232                  * talloc context.
233                  *
234                  * We just cleanup here...
235                  */
236                 talloc_free(state);
237                 return;
238         }
239
240         tevent_req_done(state->req);
241 }
242
243 int pthreadpool_tevent_job_recv(struct tevent_req *req)
244 {
245         return tevent_req_simple_recv_unix(req);
246 }