#include "../include/thpool.h" #include #include lm_thwork_t *lm_thpool_work(lm_thfunc_t func, void *arg) { lm_thwork_t *work = malloc(sizeof(lm_thwork_t)); work->next = NULL; work->func = func; work->arg = arg; return work; } void lm_thpool_free(lm_thwork_t *work) { free(work); } lm_thwork_t *lm_thpool_get(lm_thpool_t *tp) { lm_thwork_t *work; work = tp->first; if (NULL == work) return NULL; tp->first = work->next; if (NULL == tp->first) { tp->last = NULL; } return work; } void *lm_thpool_worker(void *arg) { lm_thpool_t *tp = arg; lm_thwork_t *work; while (true) { pthread_mutex_lock(&(tp->mutex)); while (tp->first == NULL && !tp->stop) pthread_cond_wait(&(tp->work_lock), &(tp->mutex)); if (tp->stop) break; work = lm_thpool_get(tp); tp->active++; pthread_mutex_unlock(&(tp->mutex)); if (work != NULL) { work->func(work->arg); lm_thpool_free(work); } pthread_mutex_lock(&(tp->mutex)); tp->active--; if (!tp->stop && tp->active == 0 && tp->first == NULL) pthread_cond_signal(&(tp->thread_lock)); pthread_mutex_unlock(&(tp->mutex)); } tp->all--; pthread_cond_signal(&(tp->thread_lock)); pthread_mutex_unlock(&(tp->mutex)); return NULL; } bool lm_thpool_init(lm_thpool_t *tp, int n) { bzero(tp, sizeof(lm_thpool_t)); tp->all = n; pthread_mutex_init(&(tp->mutex), NULL); pthread_cond_init(&(tp->work_lock), NULL); pthread_cond_init(&(tp->thread_lock), NULL); tp->first = NULL; tp->last = NULL; pthread_t handle; for (int i = 0; i < n; i++) { pthread_create(&handle, NULL, lm_thpool_worker, tp); pthread_detach(handle); } return tp; } bool lm_thpool_add(lm_thpool_t *tp, lm_thfunc_t func, void *arg) { lm_thwork_t *work = lm_thpool_work(func, arg); if (work == NULL) return false; pthread_mutex_lock(&(tp->mutex)); if (tp->first == NULL) { tp->first = work; tp->last = tp->first; } else { tp->last->next = work; tp->last = work; } pthread_cond_broadcast(&(tp->work_lock)); pthread_mutex_unlock(&(tp->mutex)); return true; } void lm_thpool_stop(lm_thpool_t *tp) { pthread_mutex_lock(&(tp->mutex)); lm_thwork_t *f = tp->first; while (f != NULL) { lm_thwork_t *n = f->next; lm_thpool_free(n); f = n; } tp->stop = true; pthread_cond_broadcast(&(tp->work_lock)); pthread_mutex_unlock(&(tp->mutex)); pthread_mutex_destroy(&(tp->mutex)); pthread_cond_destroy(&(tp->work_lock)); pthread_cond_destroy(&(tp->thread_lock)); }