我正在使用 epoll、非阻塞套接字和自定义线程池用 C 语言构建一个多线程服务器。该服务器应该同时处理多个客户端连接。但是,当我运行...
我正在用 C 语言构建一个多线程服务器,使用 epoll、非阻塞套接字和自定义线程池。该服务器应该同时处理多个客户端连接。但是,当我同时运行两个客户端应用程序实例时,服务器崩溃了。
这是我的服务器代码的关键部分,我在这里处理传入的连接并将工作委托给线程:
void esperar_peticion(int listenSock) {
int clientSock, returnCode;
struct epoll_event events[BACKLOG];
while (1) {
int cantFds = epoll_wait(epfd, events, BACKLOG, -1);
if (cantFds < 0) quit("epoll_wait");
for (int i = 0; i < cantFds; i++) {
if (events[i].data.fd == listenSock) {
clientSock = accept(listenSock, NULL, NULL);
if (clientSock < 0)
quit("accept");
else
printf("Connection established\n");
struct epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = clientSock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, clientSock, &event) < 0) quit("epoll_ctl");
} else {
int fd = events[i].data.fd;
struct epoll_event event;
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &event);
int *fd_ptr = malloc(sizeof(int));
if (fd_ptr == NULL) {
perror("malloc");
continue;
}
*fd_ptr = fd;
pool_submit(&handle_client, fd_ptr);
}
}
}
}
void handle_client(void *param) {
int clientSock = *((int*) param);
free(param);
int cmd = 0;
if (readn(clientSock, &cmd, 1) < 0) {
close(clientSock);
return;
}
switch(cmd){
case PUT:
handle_put(clientSock);
break;
case GET:
handle_get(clientSock);
break;
case DEL:
handle_del(clientSock);
break;
case STATS:
handle_stats(clientSock);
break;
case MEMORY:
rlim_t res = get_memory_limit();
char res_str[32];
snprintf(res_str, sizeof(res_str), "%lu", (unsigned long)res);
send_var(clientSock, strlen(res_str), res_str);
break;
default:
int res1 = EINVAL;
writen(clientSock, &res1, 1);
break;
}
struct epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = clientSock;
pthread_mutex_lock(&lock);
if (epoll_ctl(epfd, EPOLL_CTL_ADD, clientSock, &event) < 0) {
close(clientSock);
pthread_mutex_unlock(&lock);
quit("epoll_ctl");
}
pthread_mutex_unlock(&lock);
}
这是管理工作线程的线程池的实现:
typedef struct task_t {
void (*function)(void* arg);
void* data;
struct task_t* next;
} task;
task* task_head = NULL;
task* task_tail = NULL;
pthread_mutex_t pool_mutex;
sem_t task_sem;
pthread_t* pool_array;
int NUMBER_OF_THREADS;
void pool_init(void) {
NUMBER_OF_THREADS = (int) sysconf(_SC_NPROCESSORS_ONLN);
printf("Num hardware threads: %i\n", NUMBER_OF_THREADS);
pool_array = malloc(NUMBER_OF_THREADS * sizeof(pthread_t));
pthread_mutex_init(&pool_mutex, NULL);
sem_init(&task_sem, 0, 0);
for(int i=0; i < NUMBER_OF_THREADS; i++) {
pthread_create(&pool_array[i], NULL, wait_for_tasks, NULL);
}
}
int pool_submit(void (*funcion)(void* p), void* arg) {
pthread_mutex_lock(&pool_mutex);
enqueue(funcion, arg);
pthread_mutex_unlock(&pool_mutex);
sem_post(&task_sem);
return 0;
}
void* wait_for_tasks(void* param) {
while(1) {
sem_wait(&task_sem);
pthread_mutex_lock(&pool_mutex);
task* tarea = dequeue();
pthread_mutex_unlock(&pool_mutex);
if (tarea != NULL) {
printf("Command being processed by thread %lu\n", (unsigned long)pthread_self());
execute_task(tarea->function, tarea->data);
free(tarea);
}
}
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
// Cantidad de operaciones PUT a ejecutarse
// del tipo PUT clavei valor i
#define MAX_ENTRIES 10000
void test() {
int pipe_fd[2];
if (pipe(pipe_fd) == -1) {
perror("pipe");
exit(EXIT_FAILURE);
}
pid_t pid = fork();
if (pid == -1) {
perror("fork");
exit(EXIT_FAILURE);
}
if (pid == 0) {
close(pipe_fd[1]);
// hacemos que stdin lea el pipe
// ya que client.out toma comandos desde stdin
dup2(pipe_fd[0], STDIN_FILENO);
close(pipe_fd[0]);
// ejecutamos ./client.out
execl("./client.out", "./client.out", NULL);
perror("execl");
exit(EXIT_FAILURE);
} else {
close(pipe_fd[0]);
FILE *pipe_write = fdopen(pipe_fd[1], "w");
if (!pipe_write) {
perror("fdopen");
exit(EXIT_FAILURE);
}
// Manda los comandos PUT
for (int i = 0; i < MAX_ENTRIES; ++i) {
fprintf(pipe_write, "PUT key%d value%d\n", i, i);
fflush(pipe_write);
}
fclose(pipe_write);
int status;
waitpid(pid, &status, 0); //
if (WIFEXITED(status)) {
if (WEXITSTATUS(status) != 0) {
fprintf(stderr, "client.out fallo con %d\n", WEXITSTATUS(status));
exit(EXIT_FAILURE);
}
} else {
fprintf(stderr, "client.out no termino como debe\n");
exit(EXIT_FAILURE);
}
}
}
int main() {
printf("Ejecutando operaciones PUT para llenar la cache..\n");
test();
printf("Se realizaron las operaciones PUT.\n");
return 0;
}