struct pthreadpool_pipe {
struct pthreadpool *pool;
+ pid_t pid;
+ int pipe_fds[2];
};
+static int pthreadpool_pipe_signal(int jobid, void *private_data);
+
int pthreadpool_pipe_init(unsigned max_threads,
struct pthreadpool_pipe **presult)
{
- struct pthreadpool_pipe *p;
+ struct pthreadpool_pipe *pool;
int ret;
- p = malloc(sizeof(struct pthreadpool_pipe));
- if (p == NULL) {
+ pool = malloc(sizeof(struct pthreadpool_pipe));
+ if (pool == NULL) {
return ENOMEM;
}
+ pool->pid = getpid();
+
+ ret = pipe(pool->pipe_fds);
+ if (ret == -1) {
+ int err = errno;
+ free(pool);
+ return err;
+ }
- ret = pthreadpool_init(max_threads, &p->pool);
+ ret = pthreadpool_init(max_threads, &pool->pool,
+ pthreadpool_pipe_signal, pool);
if (ret != 0) {
- free(p);
+ close(pool->pipe_fds[0]);
+ close(pool->pipe_fds[1]);
+ free(pool);
return ret;
}
- *presult = p;
+ *presult = pool;
+ return 0;
+}
+
+static int pthreadpool_pipe_signal(int jobid, void *private_data)
+{
+ struct pthreadpool_pipe *pool = private_data;
+ ssize_t written;
+
+ do {
+ written = write(pool->pipe_fds[1], &jobid, sizeof(jobid));
+ } while ((written == -1) && (errno == EINTR));
+
+ if (written != sizeof(jobid)) {
+ return errno;
+ }
+
return 0;
}
if (ret != 0) {
return ret;
}
+
+ close(pool->pipe_fds[0]);
+ pool->pipe_fds[0] = -1;
+
+ close(pool->pipe_fds[1]);
+ pool->pipe_fds[1] = -1;
+
free(pool);
return 0;
}
+static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
+{
+ pid_t pid = getpid();
+ int signal_fd;
+ int ret;
+
+ if (pid == pool->pid) {
+ return 0;
+ }
+
+ signal_fd = pool->pipe_fds[0];
+
+ close(pool->pipe_fds[0]);
+ pool->pipe_fds[0] = -1;
+
+ close(pool->pipe_fds[1]);
+ pool->pipe_fds[1] = -1;
+
+ ret = pipe(pool->pipe_fds);
+ if (ret != 0) {
+ return errno;
+ }
+
+ ret = dup2(pool->pipe_fds[0], signal_fd);
+ if (ret != 0) {
+ return errno;
+ }
+
+ pool->pipe_fds[0] = signal_fd;
+
+ return 0;
+}
+
int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
void (*fn)(void *private_data),
void *private_data)
{
int ret;
+
+ ret = pthreadpool_pipe_reinit(pool);
+ if (ret != 0) {
+ return ret;
+ }
+
ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
return ret;
}
int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
{
- int fd;
- fd = pthreadpool_signal_fd(pool->pool);
- return fd;
+ return pool->pipe_fds[0];
}
int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
unsigned num_jobids)
{
- int ret;
- ret = pthreadpool_finished_jobs(pool->pool, jobids, num_jobids);
- return ret;
+ ssize_t to_read, nread;
+ pid_t pid = getpid();
+
+ if (pool->pid != pid) {
+ return EINVAL;
+ }
+
+ to_read = sizeof(int) * num_jobids;
+
+ do {
+ nread = read(pool->pipe_fds[0], jobids, to_read);
+ } while ((nread == -1) && (errno == EINTR));
+
+ if (nread == -1) {
+ return -errno;
+ }
+ if ((nread % sizeof(int)) != 0) {
+ return -EINVAL;
+ }
+ return nread / sizeof(int);
}