remove metze_thread_fn
[metze/samba/wip.git] / lib / tevent / tevent_metze.c
1 /*
2    tevent event library.
3
4    Copyright (C) Stefan Metzmacher 2016
5
6      ** NOTE! The following LGPL license applies to the tevent
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/filesys.h"
26 #include "talloc.h"
27 #include "tevent.h"
28 #include "tevent_internal.h"
29 #include "tevent_util.h"
30 #include "lib/util/dlinklist.h"
31
32 #include <pthread.h>
33
34 struct tevent_threadpool;
35
36 struct tevent_threadpool_job {
37         struct tevent_threadpool_job *prev, *next;
38
39         struct {
40                 struct tevent_threadpool *pool;
41                 struct tevent_req *req;
42         } busy;
43
44         struct {
45                 struct tevent_immediate *im;
46                 int ret;
47                 //bool cancel;
48         } internal;
49
50         struct {
51                 const struct tevent_threadpool_job_description *desc;
52                 void *args;
53         } state;
54 };
55
56 struct tevent_threadpool {
57         struct tevent_threadpool *prev, *next;
58         struct tevent_context *ev;
59
60         size_t max_threads;
61         struct tevent_threadpool_job *jobs;
62         struct tevent_queue *job_queue;
63
64 };
65
66 static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
67 {
68         tevent_queue_stop(pool->job_queue);
69         
70         //TODO DLIST_REMOVE all pool->jobs
71
72         return 0;
73 }
74
75 struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
76                                                     struct tevent_context *ev,
77                                                     int max_threads,
78                                                     const char *location)
79 {
80         struct tevent_threadpool *pool;
81
82         if (ev->wrapper.glue != NULL) {
83                 /*
84                  * stacking of wrappers is not supported
85                  */
86                 tevent_debug(ev->wrapper.glue->main_ev, TEVENT_DEBUG_FATAL,
87                              "%s: %s() stacking not allowed",
88                              __func__, location);
89                 errno = ENOSYS;
90                 return NULL;
91         }
92
93         pool = talloc_zero(mem_ctx, struct tevent_threadpool);
94         if (pool == NULL) {
95                 return NULL;
96         }
97         *pool = (struct tevent_threadpool) {
98                 .ev = ev,
99                 .max_threads = max_threads,
100         };
101
102         pool->job_queue = tevent_queue_create(pool, "job_queue");
103         if (pool->job_queue == NULL) {
104                 TALLOC_FREE(pool);
105                 return NULL;
106         }
107
108         DLIST_ADD_END(ev->threads.pools, pool);
109
110         talloc_set_destructor(pool, tevent_threadpool_destructor);
111         return pool;
112 }
113
114 static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
115 {
116         if (job->busy.req != NULL) {
117                 tevent_req_received(job->busy.req);
118                 job->busy.req = NULL;
119         }
120
121         if (job->busy.pool != NULL) {
122                 DLIST_REMOVE(job->busy.pool->jobs, job);
123                 job->busy.pool = NULL;
124         }
125
126         return 0;
127 }
128
129 struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
130                                                             const struct tevent_threadpool_job_description *desc,
131                                                             void *pargs,
132                                                             const char *location)
133 {
134         struct tevent_threadpool_job *job;
135         void **ppargs = (void **)pargs;
136         size_t payload;
137
138         payload = sizeof(struct tevent_immediate) + desc->args_size;
139         if (payload < sizeof(struct tevent_immediate)) {
140                 /* overflow */
141                 return NULL;
142         }
143
144         job = talloc_pooled_object(mem_ctx, struct tevent_threadpool_job,
145                                    2, payload);
146         if (job == NULL) {
147                 return NULL;
148         }
149
150         *job = (struct tevent_threadpool_job) {
151                 .internal = {
152                         .im = tevent_create_immediate(job),
153                 },
154                 .state = {
155                         .desc = desc,
156                         .args = talloc_zero_size(job, desc->args_size),
157                 },
158         };
159         talloc_set_name_const(job->state.args, desc->args_type);
160
161         talloc_set_destructor(job, tevent_threadpool_job_destructor);
162
163         *ppargs = job->state.args;
164         return job;
165 }
166
167 struct tevent_threadpool_job_state {
168         struct tevent_threadpool_job *job;
169 };
170
171 static void tevent_threadpool_job_cleanup(struct tevent_req *req,
172                                           enum tevent_req_state req_state)
173 {
174         struct tevent_threadpool_job_state *state =
175                 tevent_req_data(req,
176                 struct tevent_threadpool_job_state);
177         struct tevent_threadpool_job *job = state->job;
178
179         if (job == NULL) {
180                 return;
181         }
182
183         switch (req_state) {
184         case TEVENT_REQ_RECEIVED:
185                 break;
186         default:
187                 return;
188         }
189
190         state->job = NULL;
191
192         job->busy.req = NULL;
193 }
194
195 struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
196                                               struct tevent_context *ev,
197                                               struct tevent_threadpool *pool,
198                                               struct tevent_threadpool_job *job)
199 {
200         struct tevent_req *req;
201         struct tevent_threadpool_job_state *state;
202
203         req = tevent_req_create(mem_ctx, &state,
204                                 struct tevent_threadpool_job_state);
205         if (req == NULL) {
206                 return NULL;
207         }
208         state->job = job;
209
210         tevent_req_set_cleanup_fn(req, tevent_threadpool_job_cleanup);
211
212         return req;
213 }
214
215 int tevent_threadpool_job_recv(struct tevent_req *req, int *perrno)
216 {
217         enum tevent_req_state req_state;
218         uint64_t error;
219
220         if (!tevent_req_is_error(req, &req_state, &error)) {
221                 tevent_req_received(req);
222                 return 0;
223         }
224
225         switch (req_state) {
226         case TEVENT_REQ_NO_MEMORY:
227                 *perrno = ENOMEM;
228                 break;
229         case TEVENT_REQ_TIMED_OUT:
230                 *perrno = ETIMEDOUT;
231                 break;
232         case TEVENT_REQ_USER_ERROR:
233                 *perrno = (int)error;
234                 break;
235         default:
236                 *perrno = EIO;
237                 break;
238         }
239
240         tevent_req_received(req);
241         return -1;
242 }