LinuxSir.cn,穿越时空的Linuxsir!

 找回密码
 注册
搜索
热搜: shell linux mysql
查看: 2375|回复: 3

[原创]教你开发一个高效的服务器

[复制链接]
发表于 2008-7-28 18:02:17 | 显示全部楼层 |阅读模式
教你开发一个高效的服务器

转载请注明作者信息
作者:lijiuwei
邮箱:lijiuwei0902@gmail.com

高效服务器的关键技术:
1.需要有固定数量的线程来去处理请求,而不可以每次收到请求都fork或者create_thread;
2.如果socket使用堵塞模式,那么read必须要有超时,如果使用非堵塞模式,那么read就必须要有数据缓冲;
3.使用epoll分别在不同的线程中监视可读和可写;
4.对于tcp长连接使用主线程分配连接,然后由任务线程处理,一个任务线程对应一个任务队列;对于tcp短连接和udp连接,使用竞争线程;
5.时刻检测并且关闭超时连接;
6.真正关闭socket的地方只能由一处,其他地方发现其socket需要关闭只能标识socket应该关闭了,然后由那一处安全的关闭socket;
7.当服务器需要不断的发送硬盘上的数据给客户端(比如说文件传输服务器和流媒体服务器),那么服务器端必须先从硬盘加载数据到数据缓存,再从数据缓冲发送数据给客户端;
8.内存分配必须成功,不然线程睡眠然后重新分配直到分配成功为止;

通用设计方案:
现在我们设计一个通用的服务器,这个服务器接受客户端的连接,然后不断读取客户端的请求,在某一时刻又转为不断的发送数据给客户端;下面我们看看怎么设计这样的一个服务器

注:
1.为了只关注设计方面并且让代码尽量简洁,对于一些错误处理我删除了。
2.must_malloc,must_calloc和其他前缀为must的函数封装了相应的内存分配函数,使得分配必须成功;
3.queue_t结构是一个通用的任务队列结构,queue_new是创建新的队列,queue_push是推数据到队列中,queue_pop是从队列中抛出数据
4.下面的代码只是一个良好的服务器架构,而不是一个真正能使用的服务器
5.欢迎讨论和优化!
  1. typedef struct server_st *server_t;
  2. typedef struct task_queue_st *task_queue_t;
  3. typedef struct conn_st *conn_t;
  4. /** 服务器结构 */
  5. struct server_st {
  6.         /** 监听地址 */
  7.         struct in_addr listen_addr;
  8.         /** 监听端口 */
  9.         int port;
  10.         /** 监听描述符 */
  11.         int listen_fd;
  12.         /**负责读取请求的任务队列,一个线程负责处理一个任务队列,一共五个*/
  13.         queue_t readreq_task_queues[5];
  14.         /**负责写数据的任务队列,一个线程负责处理一个任务队列,一共五个*/
  15.         queue_t writedata_task_queues[5];
  16. };
  17. /** 任务队列结构 */
  18. struct task_queue_st {
  19.         /** 待添加的连接 */
  20.         queue_t pending_conn_queue;
  21.         /** 互斥锁,用于多线程环境下对任务队列的访问控制 */
  22.         pthread_mutex_t task_queue_mutex;
  23.         /** 链表结构,保存任务队列中的所有连接 */
  24.         conn_t conn_head;
  25.         /** 链表的最后一个节点,用于方便在结尾插入新的节点 */
  26.         conn_t conn_tail;
  27.         /** 总连接数*/
  28.         int total_conns;
  29. };
  30. /** 用户的阶段状态 */
  31. typedef enum {
  32.         status_ESTABLISHED,/** 客户端刚建立连接 */
  33.         status_CONNECTED,/** 客户端正在连接 */
  34.         status_START_RECV_DATA,/** 客户端开始接收数据,即表示服务器端开始发送数据 */
  35.         status_CLOSED/** 客户端关闭连接 */
  36. } client_status_t;
  37. /** 服务器的阶段状态 */
  38. typedef enum {
  39.         status_SERVER_CONNECTED,/** 与客户端建立连接 */
  40.         status_SERVER_CLOSING /** 服务器关闭连接 */
  41. } server_status_t;
  42. /** 连接结构 */
  43. struct conn_st {
  44.         /** 该连接的所属服务器 */
  45.         server_t server;
  46.         /** 该连接的引用计数 */
  47.         int ref_count;
  48.         /** 用户最后一次活动的时间*/
  49.         time_t last_activity;
  50.         /** 客户端当前的状态阶段*/
  51.         client_status_t client_status;
  52.         /** 服务器当前的状态阶段*/
  53.         server_status_t server_status;
  54.         /** socket描述符*/
  55.         int fd;
  56.   /** 数据缓冲,当服务器端需要不断的发送数据给客户端时被用到 */
  57.         char *remain_data;
  58.         /** 数据缓冲剩余数据大小 */
  59.         int remain_data_size;
  60.         /** 下一个连接 */
  61.         conn_t readreq_task_queue_next;
  62.         /** 下一个连接 */
  63.         conn_t writedata_task_queue_next;
  64. };
  65. /** 主函数 */
  66. int main(int argc, char **argv) {
  67.         int idx, conn_fd;
  68.         server_t server;
  69.         server = must_calloc(1, sizeof(struct server_st));
  70.         server->port = 端口号;
  71.         server->listen_addrs.s_addr = INADDR_ANY;
  72.         server->listen_fd = 服务器监听(&server->listen_addr,server->port);
  73.         设置成非堵塞(server->listen_fd);
  74.         /* 创建5个负责读取请求的任务线程 */
  75.         for (idx = 0; idx < 5; idx++) {
  76.                 task_queue_t task_queue = server->readreq_task_queues[idx];
  77.                 bzero(task_queue,sizeof(struct task_queue_st));
  78.                 task_queue->pending_conn_queue = queue_new();
  79.                 pthread_mutex_init(&task_queue->task_queue_mutex,NULL);
  80.                 pthread_create(&task_queue->thread_tid, NULL, readreq_task_thread, task_queue);
  81.         }
  82.         /* 创建5个负责写数据的任务线程 */
  83.         for (idx = 0; idx < 5; idx++) {
  84.                 task_queue_t task_queue = server->writedata_task_queues[idx];
  85.                 bzero(task_queue,sizeof(struct task_queue_st));
  86.                 task_queue->pending_conn_queue = queue_new();
  87.                 pthread_mutex_init(&task_queue->task_queue_mutex,NULL);
  88.                 pthread_create(&task_queue->thread_tid, NULL, writedata_task_thread, task_queue);
  89.         }
  90.         /* 无限循环的监听,等待客户端的连接请求 */
  91.         while (1) {
  92.                 conn_fd = accept(server->listen_fd, 0, 0);
  93.                 if (conn_fd < 0) continue;
  94.                 /* 和一个客户端建立了连接,下面开始把该连接分配给负责读取请求的任务线程*/
  95.                 assign_to_readreq_task_thread(server,conn_fd);
  96.         }
  97.         return 0;
  98. }
  99. /*分配任务,原则为哪个线程的任务队列最少就分配给哪一个线程去处理*/
  100. static void assign_to_readreq_task_thread(server_t server, int conn_fd) {
  101.         int idx;
  102.         task_queue_t task_queue, min_task_queue = NULL;
  103.         conn_t conn;
  104.         /* 查找连接数最少的任务队列 */
  105.         for (idx = 0; idx < 5; idx++) {
  106.                 task_queue = server->readreq_task_queues[idx];
  107.                 if (min_task_queue == NULL)
  108.                         min_task_queue = task_queue;
  109.                 else if (min_task_queue->total_conns > task_queue->total_conns)
  110.                         min_task_queue = task_queue;
  111.         }
  112.         if (min_task_queue) {
  113.                 conn = must_calloc(1, sizeof(struct conn_st));
  114.                 conn->server = server;
  115.                 conn->client_status = status_ESTABLISHED;
  116.                 conn->server_status = status_SERVER_CONNECTED;
  117.                 conn->ref_count++;/*累加引用计数*/
  118.                 /*添加到该任务队列中的待添加队列并且累加该任务队列里的连接数*/
  119.                 pthread_mutex_lock(&min_task_queue->task_queue_mutex);
  120.                 queue_push(min_task_queue->pending_conn_queue, conn, 1);
  121.                 min_task_queue->total_conns++;
  122.                 pthread_mutex_unlock(&min_task_queue->task_queue_mutex);
  123.         }
  124. }
  125. /**
  126. *负责读取请求的任务线程处理主函数,负责管理,处理自己队列中的所有连接
  127. */
  128. static void *readreq_task_thread(void *arg) {
  129.         int epfd = epoll_create(20480);
  130.         struct epoll_event ev,happened_ev[128];
  131.         task_queue_t task_queue = (task_queue_t) arg;
  132.         int ready,i;
  133.         while (1) {
  134.                 time_t curtime = time(NULL);
  135.                 /*下面三个变量遍历链表时使用,conn是当前连接,prev_conn是先前的连接,discarded_conn是丢弃的连接*/
  136.                 conn_t conn = NULL, prev_conn = NULL, discarded_conn;
  137.                 /*把待添加队列里的连接添加到任务队列中*/
  138.                 pthread_mutex_lock(&task_queue->task_queue_mutex);
  139.                 while ((conn = queue_pop(task_queue->pending_conn_queue))) {
  140.                         if (!task_queue->conn_head) {
  141.                                 task_queue->conn_head = conn;
  142.                         } else {
  143.                                 task_queue->conn_tail->readreq_task_queue_next = conn;
  144.                         }
  145.                         task_queue->conn_tail = conn;
  146.                 }
  147.                 pthread_mutex_unlock(&task_queue->task_queue_mutex);
  148.                 if (task_queue->total_conns == 0) {
  149.                         thread_sleep(1);
  150.                         continue;
  151.                 }
  152.                 conn = task_queue->conn_head;
  153.                 while (conn) {
  154.                         //清理已经关闭的连接,把已经关闭的连接和超时的连接从链表中删除掉
  155.                         boolean conn_timeout = conn->last_activity > 0 && curtime - conn->last_activity > CLIENT_TIMEOUT;
  156.                         if ((conn->client_status == status_CLOSED || conn->server_status == status_SERVER_CLOSING || conn_timeout) && conn->ref_count == 1) {
  157.                                 task_queue->total_conns--;
  158.                                 discarded_conn = conn;//先放入临时变量中
  159.                                 //从链表中删除
  160.                                 if (prev_conn) {
  161.                                         if (!conn->readreq_task_queue_next) {
  162.                                                 task_queue->conn_tail = prev_conn;
  163.                                                 prev_conn->readreq_task_queue_next = NULL;
  164.                                         } else {
  165.                                                 prev_conn->readreq_task_queue_next = conn->readreq_task_queue_next;
  166.                                         }
  167.                                 } else {
  168.                                         task_queue->conn_head = conn->readreq_task_queue_next;
  169.                                 }
  170.                                 conn = conn->readreq_task_queue_next;
  171.                                 close(discarded_conn->fd);/*真正关闭连接的地方*/
  172.                                 free(discarded_conn);
  173.                         } else {
  174.                                 ev.data.fd = conn->fd;
  175.                                 ev.data.ptr = conn;
  176.                                 /*只负责监视读事件和出错事件*/
  177.                                 ev.events = EPOLLIN | EPOLLET;
  178.                                 if(conn->client_status == status_ESTABLISHED) {
  179.                                         设置超时时间或者设置非堵塞(conn->fd);/*两者必须选一个*/
  180.                                         epoll_ctl(epfd,EPOLL_CTL_ADD,conn->fd,&ev);
  181.                                         conn->client_status = status_CONNECTED;
  182.                                 } else {
  183.                                         epoll_ctl(epfd,EPOLL_CTL_MOD,conn->fd,&ev);
  184.                                 }
  185.                                 prev_conn = conn;
  186.                                 conn = conn->readreq_task_queue_next;
  187.                         }
  188.                 }
  189.                 /** 堵塞等待,超时时间为POLL_TIMEOUT毫秒*/
  190.                 if ((ready = epoll_wait(epfd, happened_ev,sizeof(happened_ev) / sizeof(struct epoll_event), POLL_TIMEOUT)) < 0)
  191.                         continue;
  192.                 /**
  193.                  * 循环查找向我们发送请求的客户端的连接
  194.                  */
  195.                 for (i = 0; i < ready; i++) {
  196.                         if (happened_ev[i].events & POLLIN) {
  197.                                 conn = (conn_t) happened_ev[i].data.ptr;
  198.                                 conn->last_activity = time(NULL);/* 记录客户端的最后活动时间*/
  199.                                 proc_protocol(conn);/*处理协议函数*/
  200.                         } else if(happened_ev[i].events & EPOLLHUP) {
  201.                                 conn = (conn_t) happened_ev[i].data.ptr;
  202.                                 conn->client_status = status_CLOSED;
  203.                         }
  204.                 }
  205.         }
  206. }
  207. /* 协议处理函数 */
  208. void proc_protocol(conn_t conn) {
  209.         某一时刻客户端要求服务器端不断发送数据给客户端,然后设置了conn->client_status = status_START_RECV_DATA,并且调用了下面的move_to_writedata_task_thread(conn);
  210. }
  211. /*分配任务,原则为哪个线程的任务队列最少就分配给哪一个线程去处理*/
  212. static void move_to_writedata_task_thread(conn_t conn) {
  213.         server_t server = conn->server;
  214.         task_queue_t task_queue, min_task_queue= NULL;
  215.         int idx;
  216.         /* 查找连接数最少的任务队列 */
  217.         for (idx = 0; idx < 5; idx++) {
  218.                 task_queue = server->writedata_task_queues[idx];
  219.                 if (min_task_queue == NULL)
  220.                         min_task_queue = task_queue;
  221.                 else if (min_task_queue->total_conns > task_queue->total_conns)
  222.                         min_task_queue = task_queue;
  223.         }
  224.         if (min_task_queue) {
  225.                 conn->ref_count++;/*累加引用计数*/
  226.                 /*添加到该任务队列中的待添加队列并且累加该任务队列里的连接数*/
  227.                 pthread_mutex_lock(&min_task_queue->task_queue_mutex);
  228.                 queue_push(min_task_queue->pending_conn_queue, conn, 1);
  229.                 min_task_queue->total_conns++;
  230.                 pthread_mutex_unlock(&min_task_queue->task_queue_mutex);
  231.         }
  232. }
  233. /**
  234. *负责写数据的线程处理主函数,负责管理,处理自己队列中的所有连接
  235. */
  236. static void *writedata_task_thread(void *arg) {
  237.         task_queue_t task_queue = (task_queue_t) arg;
  238.         int epfd = epoll_create(20480);
  239.         struct epoll_event ev,happened_ev[128];
  240.         int ready,i,nwrite,nread,sndbuf_size = 10240;
  241.         char buf[sndbuf_size];
  242.         while (1) {
  243.                 time_t curtime = time(NULL);
  244.                 /*下面三个变量遍历链表时使用,conn是当前连接,prev_conn是先前的连接,discarded_conn是丢弃的连接*/
  245.                 conn_t conn = NULL, prev_conn = NULL, discarded_conn;
  246.                 /*把待添加队列里的连接添加到任务队列中*/
  247.                 pthread_mutex_lock(&task_queue->task_queue_mutex);
  248.                 while ((conn = queue_pop(task_queue->pending_conn_queue))) {
  249.                         if (!task_queue->conn_head) {
  250.                                 task_queue->conn_head = conn;
  251.                         } else {
  252.                                 task_queue->conn_tail->writedata_task_queue_next = conn;
  253.                         }
  254.                         task_queue->conn_tail = conn;
  255.                 }
  256.                 pthread_mutex_unlock(&task_queue->task_queue_mutex);
  257.                 if (task_queue->total_conns == 0) {
  258.                         thread_sleep(1);
  259.                         continue;
  260.                 }
  261.                 conn = task_queue->conn_head;
  262.                 while (conn) {
  263.                         //清理已经关闭的连接,把已经关闭的连接和超时的连接从链表中删除掉
  264.                         boolean conn_timeout = conn->last_activity > 0 && curtime - conn->last_activity > CLIENT_TIMEOUT;
  265.                         if (conn->client_status == status_CLOSED || conn->server_status == status_SERVER_CLOSING || conn_timeout) {
  266.                                 task_queue->total_conns--;
  267.                                 discarded_conn = conn;//放入临时变量中
  268.                                 //从链表中删除
  269.                                 if (prev_conn) {
  270.                                         if (!conn->writedata_task_queue_next) {
  271.                                                 task_queue->conn_tail = prev_conn;
  272.                                                 prev_conn->writedata_task_queue_next = NULL;
  273.                                         } else {
  274.                                                 prev_conn->writedata_task_queue_next = conn->writedata_task_queue_next;
  275.                                         }
  276.                                 } else {
  277.                                         task_queue->conn_head = conn->writedata_task_queue_next;
  278.                                 }
  279.                                 conn = conn->writedata_task_queue_next;
  280.                                 discarded_conn->ref_count--;/*引用计数减一,并且该连接已经关闭了,那么真正关闭连接的任务就交给readreq_task_thread函数了*/
  281.                         } else {
  282.                                 ev.data.fd = conn->fd;
  283.                                 ev.data.ptr = conn;
  284.                                 /*只负责监视写事件和出错事件*/
  285.                                 ev.events = EPOLLOUT | EPOLLET;
  286.                                 if(conn->client_status == status_START_RECV_DATA) {
  287.                                         setsockopt(conn->fd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_size, sizeof(int));
  288.                                         epoll_ctl(epfd,EPOLL_CTL_ADD,conn->fd,&ev);
  289.                                         conn->client_status = status_CONNECTED;
  290.                                 } else {
  291.                                         epoll_ctl(epfd,EPOLL_CTL_MOD,conn->fd,&ev);
  292.                                 }
  293.                                 prev_conn = conn;
  294.                                 conn = conn->writedata_task_queue_next;
  295.                         }
  296.                 }
  297.                 /**堵塞等待,超时时间为POLL_TIMEOUT毫秒*/
  298.                 if ((ready = epoll_wait(epfd, happened_ev,sizeof(happened_ev) / sizeof(struct epoll_event), POLL_TIMEOUT)) < 0)
  299.                         continue;
  300.                 /**
  301.                  * 循环查找向我们发送请求的客户端的连接
  302.                  */
  303.                 for (i = 0; i < ready; i++) {
  304.                         if (happened_ev[i].events & EPOLLOUT) {
  305.                                 conn = (conn_t) happened_ev[i].data.ptr;
  306.                                 conn->last_activity = time(NULL);/* 记录客户端的最后活动时间*/
  307.                                 if(conn->remain_data_size == 0) {/*如果数据缓存已经没有任何数据了的话*/
  308.                                         if((nread = read(文件描述符,buf,sizeof(buf))) > 0) {/*从硬盘读数据出来到数据缓存*/
  309.                                                 if((nwrite = write(conn->fd,buf,nread)) < 0 && errno != EINTR) {
  310.                                                         conn->client_status = status_CLOSED;
  311.                                                         continue;
  312.                                                 }
  313.                                                 if(nwrite < nread) {
  314.                                                         conn->remain_data_size = nread - nwrite;
  315.                                                         conn->remain_data = must_malloc(conn->remain_data_size);
  316.                                                         memcpy(conn->remain_data,buf + nwrite, conn->remain_data_size);
  317.                                                 }
  318.                                         } else if(nread == 0) {
  319.                                                 conn->server_status = status_SERVER_CLOSING;
  320.                                         }
  321.                                 } else {
  322.                                         /*写数据缓存里的数据到客户端*/
  323.                                         if((nwrite = write(conn->fd,conn->remain_data,conn->remain_data_size)) < 0 && errno != EINTR) {
  324.                                                 conn->client_status = status_CLOSED;
  325.                                                 free(conn->remain_data);
  326.                                                 continue;
  327.                                         }
  328.                                         if(nwrite < conn->remain_data_size) {
  329.                                                 conn->remain_data_size = conn->remain_data_size - nwrite;
  330.                                                 memcpy(buf, conn->remain_data + nwrite, conn->remain_data_size);
  331.                                                 conn->remain_data = must_realloc(conn->remain_data, conn->remain_data_size);
  332.                                                 memcpy(conn->remain_data, buf, conn->remain_data_size);
  333.                                         } else {
  334.                                                 free(conn->remain_data);
  335.                                                 conn->remain_data = NULL;
  336.                                                 conn->remain_data_size = 0;
  337.                                         }
  338.                                 }
  339.                         } else if(happened_ev[i].events & EPOLLHUP) {
  340.                                 conn = (conn_t) happened_ev[i].data.ptr;
  341.                                 conn->client_status = status_CLOSED;
  342.                         }
  343.                 }
  344.         }
  345. }
复制代码
发表于 2008-7-29 09:13:03 | 显示全部楼层
asert兄自己的流媒体服务器也是采用这个框架?
如果这个框架还能解决多进程的问题,类似apache那样,就更好了。
回复 支持 反对

使用道具 举报

 楼主| 发表于 2008-7-29 10:00:58 | 显示全部楼层
Post by realtang;1879672
asert兄自己的流媒体服务器也是采用这个框架?
如果这个框架还能解决多进程的问题,类似apache那样,就更好了。

这个框架是我这段时间一直做各种服务器开发总结出来的经验,以后要开发的流媒体服务器也会用这个框架的,不过现在还没有做流媒体服务器,正在学习av方面的知识。

你说的多进程的问题能再详细一点说说是什么问题吗?我觉得我的这个框架写的还不错,如果知道能有还需要改进的地方那我就又有趣的事可做了,呵呵
回复 支持 反对

使用道具 举报

发表于 2008-9-27 15:35:39 | 显示全部楼层
收藏,改天慢慢看!!
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

快速回复 返回顶部 返回列表