new: add pool server handler

This commit is contained in:
ngn 2024-06-25 21:21:15 +03:00
parent 88a396d52f
commit 92bc029d0a
13 changed files with 289 additions and 48 deletions

View File

@ -21,7 +21,7 @@ int main(int argc, char *argv[]) {
goto end; goto end;
} }
if (!lm_ctx_pools_serve(&ctx, argv[1])) { if (!lm_ctx_pools_serve(&ctx, argv[1], 10)) {
printf("failed to serve the pools: %s (%d)\n", lm_strerror(), lm_error()); printf("failed to serve the pools: %s (%d)\n", lm_strerror(), lm_error());
goto end; goto end;
} }

View File

@ -8,4 +8,4 @@ lm_pool_t *lm_ctx_pools_add(lm_ctx_t *ctx, char *name, char *url);
bool lm_ctx_pools_del(lm_ctx_t *ctx, char *name); bool lm_ctx_pools_del(lm_ctx_t *ctx, char *name);
void lm_ctx_pools_clear(lm_ctx_t *ctx); void lm_ctx_pools_clear(lm_ctx_t *ctx);
void lm_ctx_pools_test(lm_ctx_t *ctx); void lm_ctx_pools_test(lm_ctx_t *ctx);
bool lm_ctx_pools_serve(lm_ctx_t *ctx, char *addr); bool lm_ctx_pools_serve(lm_ctx_t *ctx, char *addr, uint8_t threads);

View File

@ -54,7 +54,8 @@
[8 bits] HOST SIZE [8 bits] HOST SIZE
========================================================= =========================================================
| All bits used for specifying HOST size | All bits used for specifying HOST size, always zero
| for responses
--------------------------------------------------------- ---------------------------------------------------------
[8 bits] DATA SIZE [8 bits] DATA SIZE
@ -65,7 +66,7 @@
[...] HOST [...] HOST
--------------------------------------------------------- ---------------------------------------------------------
| Plaintext server hostname, max size is 255 octets | Plaintext server hostname, max size is 255 octets
| see the SIZE section | see the SIZE section, always empty for responses
--------------------------------------------------------- ---------------------------------------------------------
[...] DATA [...] DATA
@ -125,8 +126,10 @@ bool lm_mptp_init(lm_mptp_t *packet, bool is_request, uint8_t code, bool is_last
bool lm_mptp_set_data(lm_mptp_t *packet, char *data, size_t size); bool lm_mptp_set_data(lm_mptp_t *packet, char *data, size_t size);
bool lm_mptp_set_host(lm_mptp_t *packet, char *host); bool lm_mptp_set_host(lm_mptp_t *packet, char *host);
bool lm_mptp_get_host(lm_mptp_t *packet, char *host); bool lm_mptp_get_host(lm_mptp_t *packet, char *host);
bool lm_mptp_get_data(lm_mptp_t *packet, char *data);
int lm_mptp_socket(char *addr, uint16_t port, struct sockaddr *saddr); int lm_mptp_socket(char *addr, uint16_t port, struct sockaddr *saddr);
void lm_mptp_copy(lm_mptp_t *dst, lm_mptp_t *src);
bool lm_mptp_verify(lm_mptp_t *packet); bool lm_mptp_verify(lm_mptp_t *packet);
void lm_mptp_close(int sock); void lm_mptp_close(int sock);

View File

@ -1,9 +1,20 @@
#pragma once #pragma once
#include "mptp.h"
#include "types.h" #include "types.h"
#include <stdbool.h> #include <stdbool.h>
#include <sys/socket.h>
typedef struct lm_pool_thread_arg {
int sock;
struct sockaddr addr;
lm_mptp_t packet;
lm_pool_t *pool;
} lm_pool_thread_arg_t;
lm_pool_t *lm_pool_new(char *name, char *url); lm_pool_t *lm_pool_new(char *name, char *url);
void lm_pool_test(lm_pool_t *pool); void lm_pool_test(lm_pool_t *pool);
bool lm_pool_info_file(lm_pool_t *pool, char *file); bool lm_pool_info_load(lm_pool_t *pool, char *file);
bool lm_pool_info(lm_pool_t *pool, char *info); void lm_pool_info_free(lm_pool_t *pool);
void lm_pool_free(lm_pool_t *pool); void lm_pool_free(lm_pool_t *pool);
void lm_pool_serve_thread(void *arg);
void lm_pool_serve(lm_pool_t *pool, lm_mptp_t *packet, int sock, struct sockaddr *addr);

28
include/thpool.h Normal file
View File

@ -0,0 +1,28 @@
#pragma once
#include <pthread.h>
#include <stdbool.h>
typedef void (*lm_thfunc_t)(void *arg);
typedef struct lm_thwork_t {
lm_thfunc_t func;
void *arg;
struct lm_thwork_t *next;
} lm_thwork_t;
typedef struct lm_thpool_t {
pthread_mutex_t mutex;
pthread_cond_t work_lock;
pthread_cond_t thread_lock;
size_t active;
size_t all;
lm_thwork_t *first;
lm_thwork_t *last;
bool stop;
} lm_thpool_t;
bool lm_thpool_init(lm_thpool_t *tp, int n);
bool lm_thpool_add(lm_thpool_t *tp, lm_thfunc_t func, void *data);
void lm_thpool_stop(lm_thpool_t *tp);

View File

@ -7,6 +7,7 @@ typedef struct lm_pool_info {
char *maintainer; char *maintainer;
char *pubkey; char *pubkey;
size_t size; size_t size;
char *file;
} lm_pool_info_t; } lm_pool_info_t;
typedef struct lm_pool { typedef struct lm_pool {

View File

@ -8,7 +8,7 @@ msgid ""
msgstr "" msgstr ""
"Project-Id-Version: PACKAGE VERSION\n" "Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n" "Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2024-06-23 01:52+0300\n" "POT-Creation-Date: 2024-06-25 20:58+0300\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n" "Language-Team: LANGUAGE <LL@li.org>\n"
@ -132,7 +132,7 @@ msgid "failed to bind MPTP socket"
msgstr "" msgstr ""
#: src/error.c:40 #: src/error.c:40
msgid "required argument is a NULL pointer" msgid "required argument is a NULL pointer or 0"
msgstr "" msgstr ""
#: src/error.c:41 #: src/error.c:41

View File

@ -37,7 +37,7 @@ char *lm_strerror() {
{.code = LM_ERR_MPTPSetsockopt, .desc = _("failed to set MPTP socket options") }, {.code = LM_ERR_MPTPSetsockopt, .desc = _("failed to set MPTP socket options") },
{.code = LM_ERR_MPTPTimeout, .desc = _("MPTP connection timed out") }, {.code = LM_ERR_MPTPTimeout, .desc = _("MPTP connection timed out") },
{.code = LM_ERR_MPTPBindFail, .desc = _("failed to bind MPTP socket") }, {.code = LM_ERR_MPTPBindFail, .desc = _("failed to bind MPTP socket") },
{.code = LM_ERR_ArgNULL, .desc = _("required argument is a NULL pointer") }, {.code = LM_ERR_ArgNULL, .desc = _("required argument is a NULL pointer or 0") },
{.code = LM_ERR_MPTPNotRequest, .desc = _("not a MPTP request") }, {.code = LM_ERR_MPTPNotRequest, .desc = _("not a MPTP request") },
{.code = LM_ERR_MPTPNotResponse, .desc = _("not a MPTP response") }, {.code = LM_ERR_MPTPNotResponse, .desc = _("not a MPTP response") },
{.code = LM_ERR_MPTPNotLast, .desc = _("MPTP request last flag is not set") }, {.code = LM_ERR_MPTPNotLast, .desc = _("MPTP request last flag is not set") },

View File

@ -26,10 +26,12 @@
#include "../include/error.h" #include "../include/error.h"
#include "../include/mptp.h" #include "../include/mptp.h"
#include "../include/pool.h" #include "../include/pool.h"
#include "../include/thpool.h"
#include "../include/util.h" #include "../include/util.h"
#include <libintl.h> #include <libintl.h>
#include <locale.h> #include <locale.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <sys/socket.h> #include <sys/socket.h>
void lm_ctx_init(lm_ctx_t *ctx) { void lm_ctx_init(lm_ctx_t *ctx) {
@ -100,22 +102,30 @@ lm_pool_t *lm_ctx_pools_find(lm_ctx_t *ctx, char *name) {
return NULL; return NULL;
} }
bool lm_ctx_pools_have_host(lm_ctx_t *ctx, char *host) { lm_pool_t *lm_ctx_pools_by_url(lm_ctx_t *ctx, char *host, char *path) {
lm_pool_t *cur = ctx->pools; lm_pool_t *cur = ctx->pools;
while (NULL != cur) { while (NULL != cur) {
if (eq(cur->url.host, host)) if (eq(cur->url.host, host) && eq(cur->url.path, path))
return true; return cur;
cur = cur->next; cur = cur->next;
} }
return false; return NULL;
} }
bool lm_ctx_pools_serve(lm_ctx_t *ctx, char *addr) { bool lm_ctx_pools_serve(lm_ctx_t *ctx, char *addr, uint8_t threads) {
if (NULL == addr || threads < 0) {
lm_error_set(LM_ERR_ArgNULL);
return false;
}
struct sockaddr saddr; struct sockaddr saddr;
char *host = NULL; char *host = NULL;
lm_mptp_t packet; lm_mptp_t packet;
uint16_t port; uint16_t port;
int sock; int sock;
lm_thpool_t tp;
lm_thpool_init(&tp, threads);
if (!parse_host(addr, host, &port)) if (!parse_host(addr, host, &port))
return false; return false;
@ -135,37 +145,37 @@ bool lm_ctx_pools_serve(lm_ctx_t *ctx, char *addr) {
} }
char hostname[packet.header.host_size + 1]; // +1 for NULL terminator char hostname[packet.header.host_size + 1]; // +1 for NULL terminator
char path[packet.header.data_size + 1];
if (!lm_mptp_get_data(&packet, path)) {
pdebug(ctx, __func__, "skipping packet, failed to get path: %s", lm_strerror());
continue;
}
if (!lm_mptp_get_host(&packet, hostname)) { if (!lm_mptp_get_host(&packet, hostname)) {
pdebug(ctx, __func__, "skipping packet, failed to get hostname: %s", lm_strerror()); pdebug(ctx, __func__, "skipping packet, failed to get hostname: %s", lm_strerror());
continue; continue;
} }
if (!lm_ctx_pools_have_host(ctx, hostname)) { lm_pool_t *pool = lm_ctx_pools_by_url(ctx, hostname, path);
pdebug(ctx, __func__, "unknown hostname, closing connection: %s", hostname);
if (NULL == pool) {
pdebug(ctx, __func__, "unknown pool, closing connection: %s", hostname);
lm_mptp_init(&packet, false, MPTP_S2C_BRUH, true); lm_mptp_init(&packet, false, MPTP_S2C_BRUH, true);
goto end; lm_mptp_server_send(sock, &packet, &saddr);
continue;
} }
switch (MPTP_FLAGS_CODE(&packet)) { lm_pool_thread_arg_t *arg = malloc(sizeof(lm_pool_thread_arg_t));
case MPTP_C2S_PING:
lm_mptp_init(&packet, false, MPTP_S2C_PONG, true);
goto end;
case MPTP_C2S_INFO: memcpy(&arg->addr, &saddr, sizeof(struct sockaddr));
if (packet.header.data_size <= 0) { lm_mptp_copy(&arg->packet, &packet);
lm_mptp_init(&packet, false, MPTP_S2C_WHAT, true); arg->sock = sock;
goto end;
}
lm_mptp_init(&packet, false, MPTP_S2C_COOL, true); lm_thpool_add(&tp, lm_pool_serve_thread, arg);
// send pool info
}
end:
lm_mptp_set_host(&packet, hostname);
lm_mptp_server_send(sock, &packet, &saddr);
} }
lm_thpool_stop(&tp);
return true; return true;
} }

View File

@ -45,7 +45,7 @@ bool lm_mptp_init(lm_mptp_t *packet, bool is_request, uint8_t code, bool is_last
bool lm_mptp_set_host(lm_mptp_t *packet, char *host) { bool lm_mptp_set_host(lm_mptp_t *packet, char *host) {
size_t size = strlen(host); size_t size = strlen(host);
if (size > MPTP_HOST_MAX || size <= 0) { if (size > MPTP_HOST_MAX || size < 0) {
lm_error_set(LM_ERR_MPTPBadHost); lm_error_set(LM_ERR_MPTPBadHost);
return false; return false;
} }
@ -57,7 +57,7 @@ bool lm_mptp_set_host(lm_mptp_t *packet, char *host) {
} }
bool lm_mptp_get_host(lm_mptp_t *packet, char *host) { bool lm_mptp_get_host(lm_mptp_t *packet, char *host) {
if (packet->header.host_size > MPTP_HOST_MAX || packet->header.host_size <= 0) { if (packet->header.host_size > MPTP_HOST_MAX || packet->header.host_size < 0) {
host = NULL; host = NULL;
lm_error_set(LM_ERR_BadHost); lm_error_set(LM_ERR_BadHost);
return false; return false;
@ -79,6 +79,18 @@ bool lm_mptp_set_data(lm_mptp_t *packet, char *data, size_t size) {
return true; return true;
} }
bool lm_mptp_get_data(lm_mptp_t *packet, char *data) {
if (packet->header.data_size > MPTP_DATA_MAX || packet->header.data_size < 0) {
data = NULL;
lm_error_set(LM_ERR_BadHost);
return false;
}
memcpy(data, packet->data, packet->header.data_size);
data[packet->header.data_size] = 0;
return true;
}
bool lm_mptp_verify(lm_mptp_t *packet) { bool lm_mptp_verify(lm_mptp_t *packet) {
if (NULL == packet) { if (NULL == packet) {
lm_error_set(LM_ERR_ArgNULL); lm_error_set(LM_ERR_ArgNULL);
@ -95,11 +107,6 @@ bool lm_mptp_verify(lm_mptp_t *packet) {
return false; return false;
} }
if (packet->header.host_size > MPTP_HOST_MAX || packet->header.host_size <= 0) {
lm_error_set(LM_ERR_MPTPBadHost);
return false;
}
if (MPTP_FLAGS_CODE(packet) > MPTP_CODE_MAX || MPTP_FLAGS_CODE(packet) < 0) { if (MPTP_FLAGS_CODE(packet) > MPTP_CODE_MAX || MPTP_FLAGS_CODE(packet) < 0) {
lm_error_set(LM_ERR_MPTPBadCode); lm_error_set(LM_ERR_MPTPBadCode);
return false; return false;
@ -207,6 +214,11 @@ bool lm_mptp_client_verify(lm_mptp_t *packet) {
return false; return false;
} }
if (packet->header.host_size != 0) {
lm_error_set(LM_ERR_MPTPBadHost);
return false;
}
return true; return true;
} }
@ -310,6 +322,11 @@ bool lm_mptp_server_verify(lm_mptp_t *packet) {
return false; return false;
} }
if (packet->header.host_size > MPTP_HOST_MAX || packet->header.host_size <= 0) {
lm_error_set(LM_ERR_MPTPBadHost);
return false;
}
if (!MPTP_IS_LAST(packet)) { if (!MPTP_IS_LAST(packet)) {
lm_error_set(LM_ERR_MPTPNotLast); lm_error_set(LM_ERR_MPTPNotLast);
return false; return false;
@ -366,7 +383,7 @@ bool lm_mptp_server_send(int sock, lm_mptp_t *packet, struct sockaddr *addr) {
return false; return false;
} }
if (packet->header.host_size > MPTP_HOST_MAX || packet->header.host_size <= 0) { if (packet->header.host_size != 0) {
lm_error_set(LM_ERR_MPTPBadHost); lm_error_set(LM_ERR_MPTPBadHost);
return false; return false;
} }
@ -388,3 +405,9 @@ bool lm_mptp_server_send(int sock, lm_mptp_t *packet, struct sockaddr *addr) {
return true; return true;
} }
void lm_mptp_copy(lm_mptp_t *dst, lm_mptp_t *src) {
memcpy(&dst->header, &src->header, sizeof(dst->header));
memcpy(&dst->host, &src->data, sizeof(src->host));
memcpy(&dst->data, &src->data, sizeof(src->data));
}

View File

@ -13,6 +13,9 @@ lm_pool_t *lm_pool_new(char *name, char *url) {
pool->available = true; pool->available = true;
pool->name = name; pool->name = name;
bzero(&pool->info, sizeof(pool->info));
bzero(&pool->url, sizeof(pool->url));
if (!lm_url_init(&pool->url, url)) { if (!lm_url_init(&pool->url, url)) {
free(pool); free(pool);
return NULL; return NULL;
@ -30,7 +33,9 @@ lm_pool_t *lm_pool_new(char *name, char *url) {
void lm_pool_test(lm_pool_t *pool) { void lm_pool_test(lm_pool_t *pool) {
lm_mptp_t packet; lm_mptp_t packet;
lm_mptp_init(&packet, true, MPTP_C2S_PING, true); lm_mptp_init(&packet, true, MPTP_C2S_PING, true);
lm_mptp_set_host(&packet, pool->url.host); lm_mptp_set_host(&packet, pool->url.host);
lm_mptp_set_data(&packet, pool->url.path, strlen(pool->url.path));
int sock = lm_mptp_client_connect(pool->url.host, pool->url.port); int sock = lm_mptp_client_connect(pool->url.host, pool->url.port);
if (sock == -1) { if (sock == -1) {
@ -78,23 +83,61 @@ int lm_pool_info_handler(void *data, const char *_section, const char *_key, con
return 1; return 1;
} }
bool lm_pool_info_file(lm_pool_t *pool, char *file) { bool lm_pool_info_load(lm_pool_t *pool, char *file) {
lm_pool_info_free(pool);
if (ini_parse(file, lm_pool_info_handler, pool) < 0) { if (ini_parse(file, lm_pool_info_handler, pool) < 0) {
lm_error_set(LM_ERR_PoolInfoBad); lm_error_set(LM_ERR_PoolInfoBad);
return false; return false;
} }
pool->info.file = strdup(file);
return true; return true;
} }
bool lm_pool_info(lm_pool_t *pool, char *info) { void lm_pool_info_free(lm_pool_t *pool) {
if (ini_parse_string(info, lm_pool_info_handler, pool) < 0) { free(pool->info.file);
lm_error_set(LM_ERR_PoolInfoBad); free(pool->info.pubkey);
return false; free(pool->info.maintainer);
} bzero(&pool->info, sizeof(pool->info));
return true;
} }
void lm_pool_free(lm_pool_t *pool) { void lm_pool_free(lm_pool_t *pool) {
lm_url_free(&pool->url); lm_url_free(&pool->url);
lm_pool_info_free(pool);
free(pool); free(pool);
} }
void lm_pool_serve(lm_pool_t *pool, lm_mptp_t *packet, int sock, struct sockaddr *addr) {
switch (MPTP_FLAGS_CODE(packet)) {
case MPTP_C2S_PING:
lm_mptp_init(packet, false, MPTP_S2C_PONG, true);
goto end;
case MPTP_C2S_INFO:
if (NULL == pool->info.file) {
lm_mptp_init(packet, false, MPTP_S2C_BRUH, true);
goto end;
}
FILE *info = fopen(pool->info.file, "r");
size_t read = 0;
while ((read = fread(packet->data, 1, MPTP_DATA_MAX, info)) > 0) {
lm_mptp_server_send(sock, packet, addr);
lm_mptp_init(packet, false, MPTP_S2C_COOL, false);
}
fclose(info);
lm_mptp_init(packet, false, MPTP_S2C_COOL, true);
}
end:
lm_mptp_server_send(sock, packet, addr);
}
void lm_pool_serve_thread(void *_arg) {
lm_pool_thread_arg_t *arg = _arg;
lm_pool_serve(arg->pool, &arg->packet, arg->sock, &arg->addr);
free(arg);
}

120
src/thpool.c Normal file
View File

@ -0,0 +1,120 @@
#include "../include/thpool.h"
#include <stdlib.h>
#include <strings.h>
lm_thwork_t *lm_thpool_work(lm_thfunc_t func, void *arg) {
lm_thwork_t *work = malloc(sizeof(lm_thwork_t));
work->next = NULL;
work->func = func;
work->arg = arg;
return work;
}
void lm_thpool_free(lm_thwork_t *work) {
free(work);
}
lm_thwork_t *lm_thpool_get(lm_thpool_t *tp) {
lm_thwork_t *work;
work = tp->first;
if (NULL == work)
return NULL;
tp->first = work->next;
if (NULL == tp->first) {
tp->last = NULL;
}
return work;
}
void *lm_thpool_worker(void *arg) {
lm_thpool_t *tp = arg;
lm_thwork_t *work;
while (true) {
pthread_mutex_lock(&(tp->mutex));
while (tp->first == NULL && !tp->stop)
pthread_cond_wait(&(tp->work_lock), &(tp->mutex));
if (tp->stop)
break;
work = lm_thpool_get(tp);
tp->active++;
pthread_mutex_unlock(&(tp->mutex));
if (work != NULL) {
work->func(work->arg);
lm_thpool_free(work);
}
pthread_mutex_lock(&(tp->mutex));
tp->active--;
if (!tp->stop && tp->active == 0 && tp->first == NULL)
pthread_cond_signal(&(tp->thread_lock));
pthread_mutex_unlock(&(tp->mutex));
}
tp->all--;
pthread_cond_signal(&(tp->thread_lock));
pthread_mutex_unlock(&(tp->mutex));
return NULL;
}
bool lm_thpool_init(lm_thpool_t *tp, int n) {
bzero(tp, sizeof(lm_thpool_t));
tp->all = n;
pthread_mutex_init(&(tp->mutex), NULL);
pthread_cond_init(&(tp->work_lock), NULL);
pthread_cond_init(&(tp->thread_lock), NULL);
tp->first = NULL;
tp->last = NULL;
pthread_t handle;
for (int i = 0; i < n; i++) {
pthread_create(&handle, NULL, lm_thpool_worker, tp);
pthread_detach(handle);
}
return tp;
}
bool lm_thpool_add(lm_thpool_t *tp, lm_thfunc_t func, void *arg) {
lm_thwork_t *work = lm_thpool_work(func, arg);
if (work == NULL)
return false;
pthread_mutex_lock(&(tp->mutex));
if (tp->first == NULL) {
tp->first = work;
tp->last = tp->first;
} else {
tp->last->next = work;
tp->last = work;
}
pthread_cond_broadcast(&(tp->work_lock));
pthread_mutex_unlock(&(tp->mutex));
return true;
}
void lm_thpool_stop(lm_thpool_t *tp) {
pthread_mutex_lock(&(tp->mutex));
lm_thwork_t *f = tp->first;
while (f != NULL) {
lm_thwork_t *n = f->next;
lm_thpool_free(n);
f = n;
}
tp->stop = true;
pthread_cond_broadcast(&(tp->work_lock));
pthread_mutex_unlock(&(tp->mutex));
pthread_mutex_destroy(&(tp->mutex));
pthread_cond_destroy(&(tp->work_lock));
pthread_cond_destroy(&(tp->thread_lock));
}

View File

@ -233,4 +233,6 @@ void lm_url_free(lm_url_t *url) {
if (NULL != url->path) if (NULL != url->path)
free(url->path); free(url->path);
bzero(url, sizeof(lm_url_t));
} }