lib: Move pipe signalling to pthreadpool_pipe.c
[metze/samba/wip.git] / source3 / lib / pthreadpool / pthreadpool_pipe.c
index 76bafa2c3ffb94d843a8ba443a75498a4353e3b2..3eaf5e39bd9d544294040d4a456032fb79130377 100644 (file)
 
 struct pthreadpool_pipe {
        struct pthreadpool *pool;
+       pid_t pid;
+       int pipe_fds[2];
 };
 
+static int pthreadpool_pipe_signal(int jobid, void *private_data);
+
 int pthreadpool_pipe_init(unsigned max_threads,
                          struct pthreadpool_pipe **presult)
 {
-       struct pthreadpool_pipe *p;
+       struct pthreadpool_pipe *pool;
        int ret;
 
-       p = malloc(sizeof(struct pthreadpool_pipe));
-       if (p == NULL) {
+       pool = malloc(sizeof(struct pthreadpool_pipe));
+       if (pool == NULL) {
                return ENOMEM;
        }
+       pool->pid = getpid();
+
+       ret = pipe(pool->pipe_fds);
+       if (ret == -1) {
+               int err = errno;
+               free(pool);
+               return err;
+       }
 
-       ret = pthreadpool_init(max_threads, &p->pool);
+       ret = pthreadpool_init(max_threads, &pool->pool,
+                              pthreadpool_pipe_signal, pool);
        if (ret != 0) {
-               free(p);
+               close(pool->pipe_fds[0]);
+               close(pool->pipe_fds[1]);
+               free(pool);
                return ret;
        }
 
-       *presult = p;
+       *presult = pool;
+       return 0;
+}
+
+static int pthreadpool_pipe_signal(int jobid, void *private_data)
+{
+       struct pthreadpool_pipe *pool = private_data;
+       ssize_t written;
+
+       do {
+               written = write(pool->pipe_fds[1], &jobid, sizeof(jobid));
+       } while ((written == -1) && (errno == EINTR));
+
+       if (written != sizeof(jobid)) {
+               return errno;
+       }
+
        return 0;
 }
 
@@ -55,30 +86,91 @@ int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
        if (ret != 0) {
                return ret;
        }
+
+       close(pool->pipe_fds[0]);
+       pool->pipe_fds[0] = -1;
+
+       close(pool->pipe_fds[1]);
+       pool->pipe_fds[1] = -1;
+
        free(pool);
        return 0;
 }
 
+static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
+{
+       pid_t pid = getpid();
+       int signal_fd;
+       int ret;
+
+       if (pid == pool->pid) {
+               return 0;
+       }
+
+       signal_fd = pool->pipe_fds[0];
+
+       close(pool->pipe_fds[0]);
+       pool->pipe_fds[0] = -1;
+
+       close(pool->pipe_fds[1]);
+       pool->pipe_fds[1] = -1;
+
+       ret = pipe(pool->pipe_fds);
+       if (ret != 0) {
+               return errno;
+       }
+
+       ret = dup2(pool->pipe_fds[0], signal_fd);
+       if (ret != 0) {
+               return errno;
+       }
+
+       pool->pipe_fds[0] = signal_fd;
+
+       return 0;
+}
+
 int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
                             void (*fn)(void *private_data),
                             void *private_data)
 {
        int ret;
+
+       ret = pthreadpool_pipe_reinit(pool);
+       if (ret != 0) {
+               return ret;
+       }
+
        ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
        return ret;
 }
 
 int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
 {
-       int fd;
-       fd = pthreadpool_signal_fd(pool->pool);
-       return fd;
+       return pool->pipe_fds[0];
 }
 
 int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
                                   unsigned num_jobids)
 {
-       int ret;
-       ret = pthreadpool_finished_jobs(pool->pool, jobids, num_jobids);
-       return ret;
+       ssize_t to_read, nread;
+       pid_t pid = getpid();
+
+       if (pool->pid != pid) {
+               return EINVAL;
+       }
+
+       to_read = sizeof(int) * num_jobids;
+
+       do {
+               nread = read(pool->pipe_fds[0], jobids, to_read);
+       } while ((nread == -1) && (errno == EINTR));
+
+       if (nread == -1) {
+               return -errno;
+       }
+       if ((nread % sizeof(int)) != 0) {
+               return -EINVAL;
+       }
+       return nread / sizeof(int);
 }