pthreadpool: Allow multiple jobs to be received
authorVolker Lendecke <vl@samba.org>
Mon, 24 Mar 2014 10:39:56 +0000 (10:39 +0000)
committerJeremy Allison <jra@samba.org>
Thu, 27 Mar 2014 05:06:11 +0000 (06:06 +0100)
This can avoid syscalls when multiple jobs are finished simultaneously

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/lib/asys/asys.c
source3/lib/fncall.c
source3/lib/pthreadpool/pthreadpool.c
source3/lib/pthreadpool/pthreadpool.h
source3/lib/pthreadpool/pthreadpool_sync.c
source3/lib/pthreadpool/tests.c
source3/modules/vfs_aio_pthread.c
source3/torture/bench_pthreadpool.c

index 9937d2482d54eb572fc69ef02d95b606d9814825..1fd7700f9bfb76e332c15aa471569046f79985dc 100644 (file)
@@ -295,9 +295,9 @@ int asys_result(struct asys_context *ctx, ssize_t *pret, int *perrno,
        struct asys_job *job;
        int ret, jobid;
 
-       ret = pthreadpool_finished_job(ctx->pool, &jobid);
-       if (ret != 0) {
-               return ret;
+       ret = pthreadpool_finished_jobs(ctx->pool, &jobid, 1);
+       if (ret < 0) {
+               return -ret;
        }
        if ((jobid < 0) || (jobid >= ctx->num_jobs)) {
                return EIO;
index 7f728ba015839b3792931349bf6adc07268b85d7..88304d6961cb9e931479ce3eb4c7a4d4df71ddce 100644 (file)
@@ -287,7 +287,7 @@ static void fncall_handler(struct tevent_context *ev, struct tevent_fd *fde,
        int i, num_pending;
        int job_id;
 
-       if (pthreadpool_finished_job(ctx->pool, &job_id) != 0) {
+       if (pthreadpool_finished_jobs(ctx->pool, &job_id, 1) < 0) {
                return;
        }
 
index d51e8083601a6ebd61e5f8c0826279fabe1f601f..4436ab3289fb6a6879573e48c7039b028fb1616e 100644 (file)
@@ -288,25 +288,26 @@ static void pthreadpool_join_children(struct pthreadpool *pool)
  * Fetch a finished job number from the signal pipe
  */
 
-int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid)
+int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
+                             unsigned num_jobids)
 {
-       int ret_jobid;
-       ssize_t nread;
+       ssize_t to_read, nread;
 
        nread = -1;
        errno = EINTR;
 
+       to_read = sizeof(int) * num_jobids;
+
        while ((nread == -1) && (errno == EINTR)) {
-               nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));
+               nread = read(pool->sig_pipe[0], jobids, to_read);
        }
        if (nread == -1) {
-               return errno;
+               return -errno;
        }
-       if (nread != sizeof(int)) {
-               return EINVAL;
+       if ((nread % sizeof(int)) != 0) {
+               return -EINVAL;
        }
-       *jobid = ret_jobid;
-       return 0;
+       return nread / sizeof(int);
 }
 
 /*
index fac2d25424910b2e700010e5d2113d0456e251ad..adb825a528a06f15abb5753430a31ae1bffbdef2 100644 (file)
@@ -61,7 +61,7 @@ int pthreadpool_destroy(struct pthreadpool *pool);
  *
  * This adds a job to a pthreadpool. The job can be identified by
  * job_id. This integer will be returned from
- * pthreadpool_finished_job() then the job is completed.
+ * pthreadpool_finished_jobs() then the job is completed.
  *
  * @param[in]  pool            The pool to run the job on
  * @param[in]  job_id          A custom identifier
@@ -84,15 +84,18 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 int pthreadpool_signal_fd(struct pthreadpool *pool);
 
 /**
- * @brief Get the job_id of a finished job
+ * @brief Get the job_ids of finished jobs
  *
  * This blocks until a job has finished unless the fd returned by
  * pthreadpool_signal_fd() is readable.
  *
  * @param[in]  pool            The pool to query for finished jobs
- * @param[out]  pjobid         The job_id of the finished job
- * @return                     success: 0, failure: errno
+ * @param[out]  jobids         The job_ids of the finished job
+ * @param[int]  num_jobids      The job_ids array size
+ * @return                     success: >=0, number of finished jobs
+ *                              failure: -errno
  */
-int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid);
+int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
+                             unsigned num_jobids);
 
 #endif
index 0c2d12fef30fdf03210f94210d8174d37bc7921c..5f06cae2f8c26d29782191c3235e918f302fba7d 100644 (file)
@@ -133,27 +133,35 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 
 }
 
-int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid)
+int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
+                             unsigned num_jobids)
 {
-       int ret_jobid;
-       ssize_t nread;
+       ssize_t to_read, nread;
+       int ret;
 
        nread = -1;
        errno = EINTR;
 
+       to_read = sizeof(int) * num_jobids;
+
        while ((nread == -1) && (errno == EINTR)) {
-               nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));
+               nread = read(pool->sig_pipe[0], jobids, to_read);
        }
        if (nread == -1) {
-               return errno;
+               return -errno;
        }
-       if (nread != sizeof(int)) {
-               return EINVAL;
+       if ((nread % sizeof(int)) != 0) {
+               return -EINVAL;
        }
-       *jobid = ret_jobid;
 
        pool->pipe_busy = 0;
-       return pthreadpool_write_to_pipe(pool);
+
+       ret = pthreadpool_write_to_pipe(pool);
+       if (ret != 0) {
+               return -ret;
+       }
+
+       return nread / sizeof(int);
 }
 
 int pthreadpool_destroy(struct pthreadpool *pool)
index 170cedf07f79c2ba2b552b679755ae5c60e7ed8b..847471297fabfd65714d06e10ad246ad92a00bbc 100644 (file)
@@ -71,8 +71,8 @@ static int test_jobs(int num_threads, int num_jobs)
 
        for (i=0; i<num_jobs; i++) {
                int jobid = -1;
-               ret = pthreadpool_finished_job(p, &jobid);
-               if ((ret != 0) || (jobid >= num_jobs)) {
+               ret = pthreadpool_finished_jobs(p, &jobid, 1);
+               if ((ret != 1) || (jobid >= num_jobs)) {
                        fprintf(stderr, "invalid job number %d\n", jobid);
                        return -1;
                }
@@ -284,8 +284,8 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
                                continue;
                        }
 
-                       ret = pthreadpool_finished_job(pools[j], &jobid);
-                       if ((ret != 0) || (jobid >= num_jobs * num_threads)) {
+                       ret = pthreadpool_finished_jobs(pools[j], &jobid, 1);
+                       if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
                                fprintf(stderr, "invalid job number %d\n",
                                        jobid);
                                return -1;
index f7756b946c9667e2d533d2f2315599fab9ada177..de114d1292e7f826f148c1e9373f9420a8ecf359 100644 (file)
@@ -166,8 +166,8 @@ static void aio_open_handle_completion(struct tevent_context *event_ctx,
                return;
        }
 
-       ret = pthreadpool_finished_job(open_pool, &jobid);
-       if (ret) {
+       ret = pthreadpool_finished_jobs(open_pool, &jobid, 1);
+       if (ret != 1) {
                smb_panic("aio_open_handle_completion");
                /* notreached. */
                return;
index ee0d2036b64662949d34ad526027d0a9eeb11f62..247063d969463aae0d346cf1061ac6f3e1803cbc 100644 (file)
@@ -50,15 +50,15 @@ bool run_bench_pthreadpool(int dummy)
                                  strerror(ret));
                        break;
                }
-               ret = pthreadpool_finished_job(pool, &jobid);
-               if (ret != 0) {
+               ret = pthreadpool_finished_jobs(pool, &jobid, 1);
+               if (ret < 0) {
                        d_fprintf(stderr, "pthreadpool_finished_job failed: %s\n",
-                                 strerror(ret));
+                                 strerror(-ret));
                        break;
                }
        }
 
        pthreadpool_destroy(pool);
 
-       return (ret == 0);
+       return (ret == 1);
 }