3 * Copyright (C) Volker Lendecke 2012
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "../pthreadpool/pthreadpool_pipe.h"
23 #include "lib/util/time.h"
24 #include "smbprofile.h"
26 struct asys_pwrite_args {
33 struct asys_pread_args {
40 struct asys_fsync_args {
45 struct asys_pwrite_args pwrite_args;
46 struct asys_pread_args pread_args;
47 struct asys_fsync_args fsync_args;
52 union asys_job_args args;
57 struct timespec start_time;
58 struct timespec end_time;
62 struct pthreadpool_pipe *pool;
63 int pthreadpool_pipe_fd;
66 struct asys_job **jobs;
69 struct asys_creds_context {
73 int asys_context_init(struct asys_context **pctx, unsigned max_parallel)
75 struct asys_context *ctx;
78 ctx = calloc(1, sizeof(struct asys_context));
82 ret = pthreadpool_pipe_init(max_parallel, &ctx->pool);
87 ctx->pthreadpool_pipe_fd = pthreadpool_pipe_signal_fd(ctx->pool);
93 int asys_signalfd(struct asys_context *ctx)
95 return ctx->pthreadpool_pipe_fd;
98 int asys_context_destroy(struct asys_context *ctx)
103 for (i=0; i<ctx->num_jobs; i++) {
104 if (ctx->jobs[i]->busy) {
109 ret = pthreadpool_pipe_destroy(ctx->pool);
113 for (i=0; i<ctx->num_jobs; i++) {
121 static int asys_new_job(struct asys_context *ctx, int *jobid,
122 struct asys_job **pjob)
124 struct asys_job **tmp;
125 struct asys_job *job;
128 for (i=0; i<ctx->num_jobs; i++) {
138 if (ctx->num_jobs+1 == 0) {
139 return EBUSY; /* overflow */
142 tmp = realloc(ctx->jobs, sizeof(struct asys_job *)*(ctx->num_jobs+1));
148 job = calloc(1, sizeof(struct asys_job));
152 ctx->jobs[ctx->num_jobs] = job;
154 *jobid = ctx->num_jobs;
160 static void asys_pwrite_do(void *private_data);
162 int asys_pwrite(struct asys_context *ctx, int fildes, const void *buf,
163 size_t nbyte, off_t offset, void *private_data)
165 struct asys_job *job;
166 struct asys_pwrite_args *args;
170 ret = asys_new_job(ctx, &jobid, &job);
174 job->private_data = private_data;
176 args = &job->args.pwrite_args;
177 args->fildes = fildes;
180 args->offset = offset;
182 ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pwrite_do, job);
191 static void asys_pwrite_do(void *private_data)
193 struct asys_job *job = (struct asys_job *)private_data;
194 struct asys_pwrite_args *args = &job->args.pwrite_args;
196 PROFILE_TIMESTAMP(&job->start_time);
197 job->ret = pwrite(args->fildes, args->buf, args->nbyte, args->offset);
198 PROFILE_TIMESTAMP(&job->end_time);
200 if (job->ret == -1) {
205 static void asys_pread_do(void *private_data);
207 int asys_pread(struct asys_context *ctx, int fildes, void *buf,
208 size_t nbyte, off_t offset, void *private_data)
210 struct asys_job *job;
211 struct asys_pread_args *args;
215 ret = asys_new_job(ctx, &jobid, &job);
219 job->private_data = private_data;
221 args = &job->args.pread_args;
222 args->fildes = fildes;
225 args->offset = offset;
227 ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pread_do, job);
236 static void asys_pread_do(void *private_data)
238 struct asys_job *job = (struct asys_job *)private_data;
239 struct asys_pread_args *args = &job->args.pread_args;
241 PROFILE_TIMESTAMP(&job->start_time);
242 job->ret = pread(args->fildes, args->buf, args->nbyte, args->offset);
243 PROFILE_TIMESTAMP(&job->end_time);
245 if (job->ret == -1) {
250 static void asys_fsync_do(void *private_data);
252 int asys_fsync(struct asys_context *ctx, int fildes, void *private_data)
254 struct asys_job *job;
255 struct asys_fsync_args *args;
259 ret = asys_new_job(ctx, &jobid, &job);
263 job->private_data = private_data;
265 args = &job->args.fsync_args;
266 args->fildes = fildes;
268 ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_fsync_do, job);
277 static void asys_fsync_do(void *private_data)
279 struct asys_job *job = (struct asys_job *)private_data;
280 struct asys_fsync_args *args = &job->args.fsync_args;
282 PROFILE_TIMESTAMP(&job->start_time);
283 job->ret = fsync(args->fildes);
284 PROFILE_TIMESTAMP(&job->end_time);
286 if (job->ret == -1) {
291 void asys_cancel(struct asys_context *ctx, void *private_data)
295 for (i=0; i<ctx->num_jobs; i++) {
296 struct asys_job *job = ctx->jobs[i];
298 if (job->private_data == private_data) {
304 int asys_results(struct asys_context *ctx, struct asys_result *results,
305 unsigned num_results)
307 int jobids[num_results];
310 ret = pthreadpool_pipe_finished_jobs(ctx->pool, jobids, num_results);
315 for (i=0; i<ret; i++) {
316 struct asys_result *result = &results[i];
317 struct asys_job *job;
322 if ((jobid < 0) || (jobid >= ctx->num_jobs)) {
326 job = ctx->jobs[jobid];
330 result->err = ECANCELED;
332 result->ret = job->ret;
333 result->err = job->err;
335 result->private_data = job->private_data;
336 result->duration = nsec_time_diff(&job->end_time, &job->start_time);