From 92bc029d0a4bb955ef022c0102ab6e0e150c09fd Mon Sep 17 00:00:00 2001 From: ngn Date: Tue, 25 Jun 2024 21:21:15 +0300 Subject: [PATCH] new: add pool server handler --- examples/server/main.c | 2 +- include/libmp.h | 2 +- include/mptp.h | 7 +- include/pool.h | 15 ++++- include/thpool.h | 28 ++++++++ include/types.h | 1 + locale/tr/LC_MESSAGES/libmp.po | 4 +- src/error.c | 2 +- src/libmp.c | 58 +++++++++------- src/mptp.c | 39 ++++++++--- src/pool.c | 57 ++++++++++++++-- src/thpool.c | 120 +++++++++++++++++++++++++++++++++ src/url.c | 2 + 13 files changed, 289 insertions(+), 48 deletions(-) create mode 100644 include/thpool.h create mode 100644 src/thpool.c diff --git a/examples/server/main.c b/examples/server/main.c index 80b0068..03519f1 100644 --- a/examples/server/main.c +++ b/examples/server/main.c @@ -21,7 +21,7 @@ int main(int argc, char *argv[]) { goto end; } - if (!lm_ctx_pools_serve(&ctx, argv[1])) { + if (!lm_ctx_pools_serve(&ctx, argv[1], 10)) { printf("failed to serve the pools: %s (%d)\n", lm_strerror(), lm_error()); goto end; } diff --git a/include/libmp.h b/include/libmp.h index 527237d..8e32a2e 100644 --- a/include/libmp.h +++ b/include/libmp.h @@ -8,4 +8,4 @@ lm_pool_t *lm_ctx_pools_add(lm_ctx_t *ctx, char *name, char *url); bool lm_ctx_pools_del(lm_ctx_t *ctx, char *name); void lm_ctx_pools_clear(lm_ctx_t *ctx); void lm_ctx_pools_test(lm_ctx_t *ctx); -bool lm_ctx_pools_serve(lm_ctx_t *ctx, char *addr); +bool lm_ctx_pools_serve(lm_ctx_t *ctx, char *addr, uint8_t threads); diff --git a/include/mptp.h b/include/mptp.h index 0ebd45d..f51b165 100644 --- a/include/mptp.h +++ b/include/mptp.h @@ -54,7 +54,8 @@ [8 bits] HOST SIZE ========================================================= - | All bits used for specifying HOST size + | All bits used for specifying HOST size, always zero + | for responses --------------------------------------------------------- [8 bits] DATA SIZE @@ -65,7 +66,7 @@ [...] HOST --------------------------------------------------------- | Plaintext server hostname, max size is 255 octets - | see the SIZE section + | see the SIZE section, always empty for responses --------------------------------------------------------- [...] DATA @@ -125,8 +126,10 @@ bool lm_mptp_init(lm_mptp_t *packet, bool is_request, uint8_t code, bool is_last bool lm_mptp_set_data(lm_mptp_t *packet, char *data, size_t size); bool lm_mptp_set_host(lm_mptp_t *packet, char *host); bool lm_mptp_get_host(lm_mptp_t *packet, char *host); +bool lm_mptp_get_data(lm_mptp_t *packet, char *data); int lm_mptp_socket(char *addr, uint16_t port, struct sockaddr *saddr); +void lm_mptp_copy(lm_mptp_t *dst, lm_mptp_t *src); bool lm_mptp_verify(lm_mptp_t *packet); void lm_mptp_close(int sock); diff --git a/include/pool.h b/include/pool.h index 750b169..cb57589 100644 --- a/include/pool.h +++ b/include/pool.h @@ -1,9 +1,20 @@ #pragma once +#include "mptp.h" #include "types.h" #include +#include + +typedef struct lm_pool_thread_arg { + int sock; + struct sockaddr addr; + lm_mptp_t packet; + lm_pool_t *pool; +} lm_pool_thread_arg_t; lm_pool_t *lm_pool_new(char *name, char *url); void lm_pool_test(lm_pool_t *pool); -bool lm_pool_info_file(lm_pool_t *pool, char *file); -bool lm_pool_info(lm_pool_t *pool, char *info); +bool lm_pool_info_load(lm_pool_t *pool, char *file); +void lm_pool_info_free(lm_pool_t *pool); void lm_pool_free(lm_pool_t *pool); +void lm_pool_serve_thread(void *arg); +void lm_pool_serve(lm_pool_t *pool, lm_mptp_t *packet, int sock, struct sockaddr *addr); diff --git a/include/thpool.h b/include/thpool.h new file mode 100644 index 0000000..d6a54b8 --- /dev/null +++ b/include/thpool.h @@ -0,0 +1,28 @@ +#pragma once +#include +#include + +typedef void (*lm_thfunc_t)(void *arg); +typedef struct lm_thwork_t { + lm_thfunc_t func; + void *arg; + struct lm_thwork_t *next; +} lm_thwork_t; + +typedef struct lm_thpool_t { + pthread_mutex_t mutex; + + pthread_cond_t work_lock; + pthread_cond_t thread_lock; + + size_t active; + size_t all; + + lm_thwork_t *first; + lm_thwork_t *last; + bool stop; +} lm_thpool_t; + +bool lm_thpool_init(lm_thpool_t *tp, int n); +bool lm_thpool_add(lm_thpool_t *tp, lm_thfunc_t func, void *data); +void lm_thpool_stop(lm_thpool_t *tp); diff --git a/include/types.h b/include/types.h index d73fbe0..7104aba 100644 --- a/include/types.h +++ b/include/types.h @@ -7,6 +7,7 @@ typedef struct lm_pool_info { char *maintainer; char *pubkey; size_t size; + char *file; } lm_pool_info_t; typedef struct lm_pool { diff --git a/locale/tr/LC_MESSAGES/libmp.po b/locale/tr/LC_MESSAGES/libmp.po index 1c38edd..f5a1412 100644 --- a/locale/tr/LC_MESSAGES/libmp.po +++ b/locale/tr/LC_MESSAGES/libmp.po @@ -8,7 +8,7 @@ msgid "" msgstr "" "Project-Id-Version: PACKAGE VERSION\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2024-06-23 01:52+0300\n" +"POT-Creation-Date: 2024-06-25 20:58+0300\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME \n" "Language-Team: LANGUAGE \n" @@ -132,7 +132,7 @@ msgid "failed to bind MPTP socket" msgstr "" #: src/error.c:40 -msgid "required argument is a NULL pointer" +msgid "required argument is a NULL pointer or 0" msgstr "" #: src/error.c:41 diff --git a/src/error.c b/src/error.c index 97239b7..e24b960 100644 --- a/src/error.c +++ b/src/error.c @@ -37,7 +37,7 @@ char *lm_strerror() { {.code = LM_ERR_MPTPSetsockopt, .desc = _("failed to set MPTP socket options") }, {.code = LM_ERR_MPTPTimeout, .desc = _("MPTP connection timed out") }, {.code = LM_ERR_MPTPBindFail, .desc = _("failed to bind MPTP socket") }, - {.code = LM_ERR_ArgNULL, .desc = _("required argument is a NULL pointer") }, + {.code = LM_ERR_ArgNULL, .desc = _("required argument is a NULL pointer or 0") }, {.code = LM_ERR_MPTPNotRequest, .desc = _("not a MPTP request") }, {.code = LM_ERR_MPTPNotResponse, .desc = _("not a MPTP response") }, {.code = LM_ERR_MPTPNotLast, .desc = _("MPTP request last flag is not set") }, diff --git a/src/libmp.c b/src/libmp.c index eb75406..b8d59b4 100644 --- a/src/libmp.c +++ b/src/libmp.c @@ -26,10 +26,12 @@ #include "../include/error.h" #include "../include/mptp.h" #include "../include/pool.h" +#include "../include/thpool.h" #include "../include/util.h" #include #include #include +#include #include void lm_ctx_init(lm_ctx_t *ctx) { @@ -100,22 +102,30 @@ lm_pool_t *lm_ctx_pools_find(lm_ctx_t *ctx, char *name) { return NULL; } -bool lm_ctx_pools_have_host(lm_ctx_t *ctx, char *host) { +lm_pool_t *lm_ctx_pools_by_url(lm_ctx_t *ctx, char *host, char *path) { lm_pool_t *cur = ctx->pools; while (NULL != cur) { - if (eq(cur->url.host, host)) - return true; + if (eq(cur->url.host, host) && eq(cur->url.path, path)) + return cur; cur = cur->next; } - return false; + return NULL; } -bool lm_ctx_pools_serve(lm_ctx_t *ctx, char *addr) { +bool lm_ctx_pools_serve(lm_ctx_t *ctx, char *addr, uint8_t threads) { + if (NULL == addr || threads < 0) { + lm_error_set(LM_ERR_ArgNULL); + return false; + } + struct sockaddr saddr; char *host = NULL; lm_mptp_t packet; uint16_t port; int sock; + lm_thpool_t tp; + + lm_thpool_init(&tp, threads); if (!parse_host(addr, host, &port)) return false; @@ -135,37 +145,37 @@ bool lm_ctx_pools_serve(lm_ctx_t *ctx, char *addr) { } char hostname[packet.header.host_size + 1]; // +1 for NULL terminator + char path[packet.header.data_size + 1]; + + if (!lm_mptp_get_data(&packet, path)) { + pdebug(ctx, __func__, "skipping packet, failed to get path: %s", lm_strerror()); + continue; + } + if (!lm_mptp_get_host(&packet, hostname)) { pdebug(ctx, __func__, "skipping packet, failed to get hostname: %s", lm_strerror()); continue; } - if (!lm_ctx_pools_have_host(ctx, hostname)) { - pdebug(ctx, __func__, "unknown hostname, closing connection: %s", hostname); + lm_pool_t *pool = lm_ctx_pools_by_url(ctx, hostname, path); + + if (NULL == pool) { + pdebug(ctx, __func__, "unknown pool, closing connection: %s", hostname); lm_mptp_init(&packet, false, MPTP_S2C_BRUH, true); - goto end; + lm_mptp_server_send(sock, &packet, &saddr); + continue; } - switch (MPTP_FLAGS_CODE(&packet)) { - case MPTP_C2S_PING: - lm_mptp_init(&packet, false, MPTP_S2C_PONG, true); - goto end; + lm_pool_thread_arg_t *arg = malloc(sizeof(lm_pool_thread_arg_t)); - case MPTP_C2S_INFO: - if (packet.header.data_size <= 0) { - lm_mptp_init(&packet, false, MPTP_S2C_WHAT, true); - goto end; - } + memcpy(&arg->addr, &saddr, sizeof(struct sockaddr)); + lm_mptp_copy(&arg->packet, &packet); + arg->sock = sock; - lm_mptp_init(&packet, false, MPTP_S2C_COOL, true); - // send pool info - } - - end: - lm_mptp_set_host(&packet, hostname); - lm_mptp_server_send(sock, &packet, &saddr); + lm_thpool_add(&tp, lm_pool_serve_thread, arg); } + lm_thpool_stop(&tp); return true; } diff --git a/src/mptp.c b/src/mptp.c index b570cd5..0a872b8 100644 --- a/src/mptp.c +++ b/src/mptp.c @@ -45,7 +45,7 @@ bool lm_mptp_init(lm_mptp_t *packet, bool is_request, uint8_t code, bool is_last bool lm_mptp_set_host(lm_mptp_t *packet, char *host) { size_t size = strlen(host); - if (size > MPTP_HOST_MAX || size <= 0) { + if (size > MPTP_HOST_MAX || size < 0) { lm_error_set(LM_ERR_MPTPBadHost); return false; } @@ -57,7 +57,7 @@ bool lm_mptp_set_host(lm_mptp_t *packet, char *host) { } bool lm_mptp_get_host(lm_mptp_t *packet, char *host) { - if (packet->header.host_size > MPTP_HOST_MAX || packet->header.host_size <= 0) { + if (packet->header.host_size > MPTP_HOST_MAX || packet->header.host_size < 0) { host = NULL; lm_error_set(LM_ERR_BadHost); return false; @@ -79,6 +79,18 @@ bool lm_mptp_set_data(lm_mptp_t *packet, char *data, size_t size) { return true; } +bool lm_mptp_get_data(lm_mptp_t *packet, char *data) { + if (packet->header.data_size > MPTP_DATA_MAX || packet->header.data_size < 0) { + data = NULL; + lm_error_set(LM_ERR_BadHost); + return false; + } + + memcpy(data, packet->data, packet->header.data_size); + data[packet->header.data_size] = 0; + return true; +} + bool lm_mptp_verify(lm_mptp_t *packet) { if (NULL == packet) { lm_error_set(LM_ERR_ArgNULL); @@ -95,11 +107,6 @@ bool lm_mptp_verify(lm_mptp_t *packet) { return false; } - if (packet->header.host_size > MPTP_HOST_MAX || packet->header.host_size <= 0) { - lm_error_set(LM_ERR_MPTPBadHost); - return false; - } - if (MPTP_FLAGS_CODE(packet) > MPTP_CODE_MAX || MPTP_FLAGS_CODE(packet) < 0) { lm_error_set(LM_ERR_MPTPBadCode); return false; @@ -207,6 +214,11 @@ bool lm_mptp_client_verify(lm_mptp_t *packet) { return false; } + if (packet->header.host_size != 0) { + lm_error_set(LM_ERR_MPTPBadHost); + return false; + } + return true; } @@ -310,6 +322,11 @@ bool lm_mptp_server_verify(lm_mptp_t *packet) { return false; } + if (packet->header.host_size > MPTP_HOST_MAX || packet->header.host_size <= 0) { + lm_error_set(LM_ERR_MPTPBadHost); + return false; + } + if (!MPTP_IS_LAST(packet)) { lm_error_set(LM_ERR_MPTPNotLast); return false; @@ -366,7 +383,7 @@ bool lm_mptp_server_send(int sock, lm_mptp_t *packet, struct sockaddr *addr) { return false; } - if (packet->header.host_size > MPTP_HOST_MAX || packet->header.host_size <= 0) { + if (packet->header.host_size != 0) { lm_error_set(LM_ERR_MPTPBadHost); return false; } @@ -388,3 +405,9 @@ bool lm_mptp_server_send(int sock, lm_mptp_t *packet, struct sockaddr *addr) { return true; } + +void lm_mptp_copy(lm_mptp_t *dst, lm_mptp_t *src) { + memcpy(&dst->header, &src->header, sizeof(dst->header)); + memcpy(&dst->host, &src->data, sizeof(src->host)); + memcpy(&dst->data, &src->data, sizeof(src->data)); +} diff --git a/src/pool.c b/src/pool.c index 20a8705..40305a4 100644 --- a/src/pool.c +++ b/src/pool.c @@ -13,6 +13,9 @@ lm_pool_t *lm_pool_new(char *name, char *url) { pool->available = true; pool->name = name; + bzero(&pool->info, sizeof(pool->info)); + bzero(&pool->url, sizeof(pool->url)); + if (!lm_url_init(&pool->url, url)) { free(pool); return NULL; @@ -30,7 +33,9 @@ lm_pool_t *lm_pool_new(char *name, char *url) { void lm_pool_test(lm_pool_t *pool) { lm_mptp_t packet; lm_mptp_init(&packet, true, MPTP_C2S_PING, true); + lm_mptp_set_host(&packet, pool->url.host); + lm_mptp_set_data(&packet, pool->url.path, strlen(pool->url.path)); int sock = lm_mptp_client_connect(pool->url.host, pool->url.port); if (sock == -1) { @@ -78,23 +83,61 @@ int lm_pool_info_handler(void *data, const char *_section, const char *_key, con return 1; } -bool lm_pool_info_file(lm_pool_t *pool, char *file) { +bool lm_pool_info_load(lm_pool_t *pool, char *file) { + lm_pool_info_free(pool); + if (ini_parse(file, lm_pool_info_handler, pool) < 0) { lm_error_set(LM_ERR_PoolInfoBad); return false; } + + pool->info.file = strdup(file); return true; } -bool lm_pool_info(lm_pool_t *pool, char *info) { - if (ini_parse_string(info, lm_pool_info_handler, pool) < 0) { - lm_error_set(LM_ERR_PoolInfoBad); - return false; - } - return true; +void lm_pool_info_free(lm_pool_t *pool) { + free(pool->info.file); + free(pool->info.pubkey); + free(pool->info.maintainer); + bzero(&pool->info, sizeof(pool->info)); } void lm_pool_free(lm_pool_t *pool) { lm_url_free(&pool->url); + lm_pool_info_free(pool); free(pool); } + +void lm_pool_serve(lm_pool_t *pool, lm_mptp_t *packet, int sock, struct sockaddr *addr) { + switch (MPTP_FLAGS_CODE(packet)) { + case MPTP_C2S_PING: + lm_mptp_init(packet, false, MPTP_S2C_PONG, true); + goto end; + + case MPTP_C2S_INFO: + if (NULL == pool->info.file) { + lm_mptp_init(packet, false, MPTP_S2C_BRUH, true); + goto end; + } + + FILE *info = fopen(pool->info.file, "r"); + size_t read = 0; + + while ((read = fread(packet->data, 1, MPTP_DATA_MAX, info)) > 0) { + lm_mptp_server_send(sock, packet, addr); + lm_mptp_init(packet, false, MPTP_S2C_COOL, false); + } + + fclose(info); + lm_mptp_init(packet, false, MPTP_S2C_COOL, true); + } + +end: + lm_mptp_server_send(sock, packet, addr); +} + +void lm_pool_serve_thread(void *_arg) { + lm_pool_thread_arg_t *arg = _arg; + lm_pool_serve(arg->pool, &arg->packet, arg->sock, &arg->addr); + free(arg); +} diff --git a/src/thpool.c b/src/thpool.c new file mode 100644 index 0000000..be12a0c --- /dev/null +++ b/src/thpool.c @@ -0,0 +1,120 @@ +#include "../include/thpool.h" +#include +#include + +lm_thwork_t *lm_thpool_work(lm_thfunc_t func, void *arg) { + lm_thwork_t *work = malloc(sizeof(lm_thwork_t)); + work->next = NULL; + work->func = func; + work->arg = arg; + return work; +} + +void lm_thpool_free(lm_thwork_t *work) { + free(work); +} + +lm_thwork_t *lm_thpool_get(lm_thpool_t *tp) { + lm_thwork_t *work; + work = tp->first; + if (NULL == work) + return NULL; + + tp->first = work->next; + if (NULL == tp->first) { + tp->last = NULL; + } + + return work; +} + +void *lm_thpool_worker(void *arg) { + lm_thpool_t *tp = arg; + lm_thwork_t *work; + + while (true) { + pthread_mutex_lock(&(tp->mutex)); + while (tp->first == NULL && !tp->stop) + pthread_cond_wait(&(tp->work_lock), &(tp->mutex)); + + if (tp->stop) + break; + + work = lm_thpool_get(tp); + tp->active++; + pthread_mutex_unlock(&(tp->mutex)); + + if (work != NULL) { + work->func(work->arg); + lm_thpool_free(work); + } + + pthread_mutex_lock(&(tp->mutex)); + tp->active--; + if (!tp->stop && tp->active == 0 && tp->first == NULL) + pthread_cond_signal(&(tp->thread_lock)); + pthread_mutex_unlock(&(tp->mutex)); + } + + tp->all--; + pthread_cond_signal(&(tp->thread_lock)); + pthread_mutex_unlock(&(tp->mutex)); + return NULL; +} + +bool lm_thpool_init(lm_thpool_t *tp, int n) { + bzero(tp, sizeof(lm_thpool_t)); + tp->all = n; + + pthread_mutex_init(&(tp->mutex), NULL); + pthread_cond_init(&(tp->work_lock), NULL); + pthread_cond_init(&(tp->thread_lock), NULL); + + tp->first = NULL; + tp->last = NULL; + + pthread_t handle; + for (int i = 0; i < n; i++) { + pthread_create(&handle, NULL, lm_thpool_worker, tp); + pthread_detach(handle); + } + return tp; +} + +bool lm_thpool_add(lm_thpool_t *tp, lm_thfunc_t func, void *arg) { + lm_thwork_t *work = lm_thpool_work(func, arg); + if (work == NULL) + return false; + + pthread_mutex_lock(&(tp->mutex)); + if (tp->first == NULL) { + tp->first = work; + tp->last = tp->first; + } else { + tp->last->next = work; + tp->last = work; + } + + pthread_cond_broadcast(&(tp->work_lock)); + pthread_mutex_unlock(&(tp->mutex)); + return true; +} + +void lm_thpool_stop(lm_thpool_t *tp) { + pthread_mutex_lock(&(tp->mutex)); + + lm_thwork_t *f = tp->first; + while (f != NULL) { + lm_thwork_t *n = f->next; + lm_thpool_free(n); + f = n; + } + + tp->stop = true; + pthread_cond_broadcast(&(tp->work_lock)); + pthread_mutex_unlock(&(tp->mutex)); + + pthread_mutex_destroy(&(tp->mutex)); + pthread_cond_destroy(&(tp->work_lock)); + pthread_cond_destroy(&(tp->thread_lock)); +} diff --git a/src/url.c b/src/url.c index 8184ce0..84e0460 100644 --- a/src/url.c +++ b/src/url.c @@ -233,4 +233,6 @@ void lm_url_free(lm_url_t *url) { if (NULL != url->path) free(url->path); + + bzero(url, sizeof(lm_url_t)); }