Web 服务器并发的三种方式

作者:Martin Kalin

多进程、多线程和事件驱动 I/O:Web 服务器中的权衡。

Web 服务器需要支持并发。服务器应以及时、公平的方式为客户端提供服务,以确保没有客户端因为某些其他客户端导致服务器挂起而处于饥饿状态。多进程和多线程以及它们的混合是实现并发的传统方法。Node.js 代表了另一种方式,它基于异步 I/O 的系统库,例如 epoll (Linux) 和 kqueue (FreeBSD)。为了突出这些方法之间的权衡,我编写了三个接近底层的 C 语言回显服务器:forking_server、threading_server 和 polling_server。

共享代码

Web 服务器使用 utils.c(清单 1)。函数 error_msg 打印消息并可选择终止服务器;announce_client 转储有关连接的信息;generate_echo_response 创建语法正确的 HTTP 响应。

= 清单 1. utils.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include "utils.h"

void error_msg(const char* msg, bool halt_flag) {
    perror(msg);
    if (halt_flag) exit(-1); 
}

/* listening socket */
int create_server_socket(bool non_blocking) {
  /* Modify as needed. */
  const int port = 3000;  
  
  struct sockaddr_in server_addr;
  
  /* create, bind, listen */
  int sock = socket(AF_INET,     /* family */
		    SOCK_STREAM, /* TCP */
		    0);          
  if (socket < 0) error_msg("Problem with socket call", true);

  /* non-blocking? */
  if (non_blocking) fcntl(sock, F_SETFL, O_NONBLOCK);
  
  /* bind */
  bzero(&server_addr, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = INADDR_ANY;
  server_addr.sin_port = htons(port); /* host to network endian */
  if (bind(sock, (struct sockaddr*) &server_addr, 
   ↪sizeof(server_addr)) < 0) 
    error_msg("Problem with bind call", true);

  /* listen */  
  fprintf(stderr, "Listening for requests on port %i...\n", port);
  if (listen(sock, BACKLOG) < 0)
    error_msg("Problem with listen call", true);

  return sock;
}

void announce_client(struct in_addr* addr) {
  char buffer[BUFF_SIZE + 1];

  inet_ntop(AF_INET, addr, buffer, sizeof(buffer));
  fprintf(stderr, "Client connected from %s...\n", buffer);
}

void generate_echo_response(char request[ ], char response[ ]) {
  strcpy(response, "HTTP/1.1 200 OK\n");        
  strcat(response, "Content-Type: text/*\n");
  strcat(response, "Accept-Ranges: bytes\n"); 
  strcat(response, "Connection: close\n\n");
  strcat(response, request);
}

中心函数是 create_server_socket,它创建一个阻塞或非阻塞监听套接字。此函数调用三个系统函数

  • socket — 创建套接字。

  • bind — 设置端口。

  • listen — 等待连接。

第一个调用创建一个基于 TCP 的套接字,然后 bind 调用指定 Web 服务器等待连接的端口号。listen 调用开始等待最多 BACKLOG 个连接


if (listen(sock, BACKLOG) < 0) /* BACKLOG == 12 */
  error_msg("Problem with listen call", true);

多进程服务器

清单 2 中的 forking_server 通过多进程支持并发,早期 Web 服务器(例如 Apache 1)过去常常使用这种方法来启动作为 C 或 Perl 脚本编写的 Web 应用程序。单独的进程处理单独的连接。尽管现代服务器(例如 Apache 2)通常结合了多进程和多线程,但这种方法仍然不过时。

清单 2. forking_server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <signal.h>
#include "utils.h"

int main() {
  /* Avoid zombies. */
  signal(SIGCHLD, SIG_IGN);

  char buffer[BUFF_SIZE + 1];      

  struct sockaddr_in client_addr;
  socklen_t len = sizeof(struct sockaddr_in);

  /* listening socket */
  int sock = create_server_socket(false);

  /* connections + requests */
  while (true) {
     int client = accept(sock, 
			(struct sockaddr*) &client_addr, 
			&len);
    if (client < 0) error_msg("Problem with accept call", true);

    announce_client(&client_addr.sin_addr);

    /* fork child */
    pid_t pid = fork();
    if (pid < 0) error_msg("Problem with fork call", false);

    /* 0 to child, child's PID to parent */
    if (0 == pid) {  /** child **/
      close(sock);   /* child's listening socket */

      /* request */
      bzero(buffer, sizeof(buffer));
      int bytes_read = recv(client, buffer, sizeof(buffer), 0); 
      if (bytes_read < 0) error_msg("Problem with 
          ↪recv call", false);
      
      /* response */
      char response[BUFF_SIZE * 2]; 
      bzero(response, sizeof(response));
      generate_echo_response(buffer, response);
      int bytes_written = send(client, response, 
        ↪strlen(response), 0); 
      if (bytes_written < 0) error_msg("Problem with 
         ↪send call", false);

      close(client); 
      exit(0);       /* terminate */
    } 
    else             /** parent **/
      close(client); /* parent's read/write socket. */
  } 

  return 0; 
}

forking_server 在父进程和尽可能多的子进程(有多少连接的客户端就有多少子进程)之间分配工作。客户端处于活动状态,直到连接关闭,连接关闭结束会话。

父进程从第一条指令开始执行 main。父进程监听连接,并为每个连接

  • 生成一个新进程来处理连接。

  • 继续监听其他连接。

以下是关键代码段


pid_t pid = fork(); /* spawn child */
if (0 == pid) {     /* child */
   close(sock);     /* close inherited listening socket */
   /* handle request and terminate */
   ...
}               
else                /* parent */
  close(client);    /* close client, resume listening */

父进程执行对 fork 的调用。如果调用成功,fork 返回一个非负整数:0 给 fork 的子进程,子进程的进程标识符给父进程。子进程继承父进程的打开套接字描述符,这解释了 if-else 结构

  • if 子句:子进程关闭其监听套接字的副本,因为接受客户端是父进程的工作。子进程处理客户端的请求,然后通过调用 exit 终止。

  • else 子句:父进程关闭客户端套接字,因为 fork 的子进程处理客户端。父进程继续监听连接。

创建和销毁进程的开销很大。诸如 FastCGI 之类的模块通过预先 fork 来补救这种低效率。在启动时,FastCGI 创建一个可重用的客户端处理进程池。

然而,效率低下仍然存在。当一个进程抢占另一个进程时,会发生上下文切换,导致系统工作以确保切换进和切换出的进程行为正确。内核维护每个进程的上下文信息,以便抢占的进程可以重新启动。上下文的三个主要结构是

  • 页表:将虚拟地址映射到物理地址。

  • 进程表:存储重要信息。

  • 文件表:跟踪进程的打开文件。

系统花费在上下文切换上的 CPU 周期不能用于 Web 服务器等应用程序。尽管测量上下文切换的延迟并非易事,但每次切换 5 毫秒到 10 毫秒是一个大致的范围,甚至是一个乐观的范围。预先 fork 减轻了进程创建和销毁的低效率,但并未消除上下文切换。

多进程有什么好处?进程结构使程序员无需同步对共享内存位置的并发访问。例如,想象一个 Web 应用程序,它让用户玩一个简单的文字游戏。应用程序显示打乱的字母,例如kcddoe,玩家尝试解开字母以形成一个单词——在本例中是docked。这是一个单人游戏,应用程序必须跟踪游戏的状态:要解开的字符串、玩家一次移动一个字母以及其他操作。假设有一个全局变量


typedef struct {
  /* variables to track game's state */
} WordGame;
WordGame game; /* single, global instance */

以便应用程序在源代码中只有一个 WordGame 实例在应用程序的函数(例如,move_letter、submit_guess)中可见。并发玩家需要单独的 WordGame,以便一个玩家不会干扰另一个玩家的游戏。

现在,考虑两个子进程 C1 和 C2,每个子进程处理一个玩家。在 Linux 下,fork 的子进程继承其父进程的地址空间;因此,C1 和 C2 开始时彼此及其父进程具有相同的地址空间。如果这三个进程之后都没有执行写操作,则不会造成危害。对单独地址空间的需求首先出现在写操作中。因此,Linux 对内存页强制执行写时复制 (COW) 策略。如果 C1 或 C2 对继承的页执行写操作,则此子进程会获得其自己的页副本,并且子进程的页表会更新。因此,实际上,父进程和 fork 的子进程具有单独的地址空间,并且每个客户端实际上都拥有其可写 WordGame 的副本。

多线程服务器

清单 3 中所示的 multithreading_server 避免了 forking_server 的上下文切换缺点,但面临着自身的挑战。每个进程至少有一个执行线程。单个多线程进程有多个线程。threading_server 是多线程的。

清单 3. threading_server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <signal.h>
#include <pthread.h>
#include "utils.h"

/* thread routine */
void* handle_client(void* client_ptr) {
  pthread_detach(pthread_self()); /* terminates on return */

  /* read/write socket */
  int client = *((int*) client_ptr);

  /* request */
  char buffer[BUFF_SIZE + 1];
  bzero(buffer, sizeof(buffer));
  int bytes_read = recv(client, buffer, sizeof(buffer), 0); 
  if (bytes_read < 0) error_msg("Problem with recv call", false);

  /* response */
  char response[BUFF_SIZE * 2]; 
  bzero(response, sizeof(response));
  generate_echo_response(buffer, response);
  int bytes_written = send(client, response, strlen(response), 0); 
  if (bytes_written < 0) error_msg("Problem with send call", false);

  close(client); 

  return NULL;
} /* detached thread terminates on return */

int main() {  
  char buffer[BUFF_SIZE + 1];      
  
  struct sockaddr_in client_addr;
  socklen_t len = sizeof(struct sockaddr_in);

  /* listening socket */
  int sock = create_server_socket(false);

  /* connections */
  while (true) {
    int client = accept(sock, 
			(struct sockaddr*) &client_addr, 
			&len);
    if (client < 0) error_msg("Problem accepting a 
        ↪client request", true);

    announce_client(&client_addr.sin_addr);

    /* client handler */
    pthread_t tid;
    int flag = pthread_create(&tid,          /* id */
			      NULL,          /* attributes */
			      handle_client, /* routine */
			      &client);      /* routine's arg */
    if (flag < 0) error_msg("Problem creating thread", false);
  } 

  return 0; 
}

threading_server 模仿 forking_server 中的分工策略,但客户端处理程序现在是单个进程内的线程,而不是 fork 的子进程。这种差异是巨大的。由于 COW,单独的进程实际上具有单独的地址空间,但进程内的单独线程共享一个地址空间。

当客户端连接时,threading_server 将处理委托给新线程


pthread_create(&tid,          /* id */
	       NULL,          /* attributes */
               handle_client, /* routine */
               &client);      /* arg to routine */

该线程获得唯一标识符并执行线程例程——在本例中为 handle_client。threading_server 将客户端套接字传递给线程例程,该例程从客户端读取和写入客户端。

WordGame 如何移植到 forking_server?此服务器必须确保每个客户端一个 WordGame 实例。单个 WordGame


WordGame game; /* one instance */

可以变成这些数组


WordGame games[BACKLOG]; /* BACKLOG == max clients */

当客户端连接时,threading_server 可以搜索可用的游戏实例并将此实例传递给客户端处理线程


int game_index = get_open_game(); /* called in main so thread safe */

在函数 main 中,threading_server 将调用 get_open_game,然后每个客户端处理线程都可以访问其自己的 WordGame 实例


games[game_index].socket = client;   
pthread_create(&tid,                 /* id */
               NULL,                 /* attributes */
               handle_client,        /* routine */
               &games[game_index]);  /* WordGame arg */

线程例程本地的 WordGame 也将起作用


void* handle_client(void* client_ptr) {
  WordGame game; /* local so thread safe */
  /* ... */
}

每个线程都获得其自己的本地副本,这些副本因此是线程安全的。重要的是,程序员而不是系统确保每个客户端一个 WordGame。

对于线程池,threading_server 将更有效率。FastCGI 中用于进程的预先 fork 策略很好地扩展到线程。

轮询服务器

清单 4 是 polling_server,它在某些方面类似于 forking_server,而在其他方面类似于 threading_server。

清单 4. polling_server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <signal.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include "utils.h"

#define MAX_BUFFERS (BACKLOG + 1) /* max clients + listener */

int main() {
  char buffer[BUFF_SIZE + 1];      

  /* epoll structures */
  struct epoll_event event,     /* server2epoll */
    event_buffers[MAX_BUFFERS]; /* epoll2server */
  
  int epollfd = epoll_create(MAX_BUFFERS); /* arg just a hint */ 
  if (epollfd &lt; 0) error_msg("Problem with epoll_create", 
      ↪true);

  struct sockaddr_in client_addr;
  socklen_t len = sizeof(struct sockaddr_in);

  int sock = create_server_socket(true); /* non-blocking */

  /* polling */
  event.events = EPOLLIN | EPOLLET; /* incoming, edge-triggered */
  event.data.fd = sock;             /* register listener */
  if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sock, &event) < 0) 
    error_msg("Problem with epoll_ctl call", true);
  
  /* connections + requests */
  while (true) {
    /* event count */
    int n = epoll_wait(epollfd, event_buffers, MAX_BUFFERS, -1);
    if (n < 0) error_msg("Problem with epoll_wait call", true);

    /*
       -- If connection, add to polling: may be none or more
       -- If request, read and echo 
    */
    int i;
    for (i = 0; i < n; i++) {
      /* listener? */
      if (event_buffers[i].data.fd == sock) {
	while (true) {
	  socklen_t len = sizeof(client_addr);
	  int client = accept(sock,
			      (struct sockaddr *) &client_addr, 
			      &len);

	  /* no client? */
	  if (client < 0 && (EAGAIN == errno || 
              ↪EWOULDBLOCK == errno)) break;
	  
	  /* client */
	  fcntl(client, F_SETFL, O_NONBLOCK); /* non-blocking */
	  event.events = EPOLLIN | EPOLLET;   /* incoming, 
                                                 edge-triggered */
	  event.data.fd = client;
	  if (epoll_ctl(epollfd, EPOLL_CTL_ADD, client, &event) < 0)
	    error_msg("Problem with epoll_ctl ADD call", false);	  
	  
	  announce_client(&client_addr.sin_addr);
	}
      }
      /* request */
      else {
	bzero(buffer, sizeof(buffer));
	int bytes_read = recv(event_buffers[i].data.fd, buffer,
        ↪sizeof(buffer), 0); 

	/* echo request */
	if (bytes_read < 0) {
	  char response[BUFF_SIZE * 2]; 
	  bzero(response, sizeof(response));
	  generate_echo_response(buffer, response);
	  int bytes_written = 
	    send(event_buffers[i].data.fd, response, 
            ↪strlen(response), 0); 
	  if (bytes_written < 0) error_msg("Problem with 
              ↪send call", false);
	
	  close(event_buffers[i].data.fd); /* epoll stops 
                                              polling fd */
	}  
      } 
    } 
  } 

  return 0;
}

polling_server 很复杂


while (true)         /* listening loop */
  for (...)          /* event loop */
     if (...)        /* accepting event? */
       while (true)  /* accepting loop  */
     else            /* request event */

此服务器作为一个进程中的一个线程执行,因此必须通过从一项任务(例如,接受连接)快速跳转到另一项任务(例如,读取请求)来支持并发。这些敏捷的跳转是非阻塞 I/O 操作的一部分,特别是对 accept(连接)和 recv(请求)的调用。

polling_server 对 accept 的调用立即返回

  • 如果没有客户端等待连接,服务器将继续检查是否有要读取的请求。

  • 如果有等待的客户端,polling_server 会在一个循环中接受它们。

polling_server 使用 epoll 系统库,声明单个 epoll 结构和这些数组


struct epoll_event              
      event,                     /* from server to epoll */
      event_buffers[MAX_EVENTS]; /* from epoll to server */

服务器使用单个结构来注册对监听套接字上的连接和客户端套接字上的请求的兴趣。epoll 库使用 epoll 结构数组来记录此类事件。分工是

  • polling_server 向 epoll 注册感兴趣的事件。

  • epoll 库在 epoll_event 结构中记录检测到的事件。

  • polling_server 处理 epoll 检测到的事件。

polling_server 对传入 (EPOLLIN) 事件和边沿触发 (EPOLLET) 而不是电平触发事件感兴趣。这种区别来自数字逻辑设计,但例子比比皆是。红色交通信号灯是一个电平触发事件,表示车辆应保持停止状态,而从绿色到红色的过渡是一个边沿触发事件,表示车辆应停车。polling_server 对首次发生的连接和请求事件感兴趣。

for 循环遍历检测到的事件。在循环之上,语句


int n = epoll_wait(epollfd, event_buffers, MAX_EVENTS, -1);

获取事件计数,其中事件是连接或请求。

我的 polling_server 采取了捷径。当服务器读取请求时,它仅读取当时可用的字节。但是,服务器可能需要多次读取才能获得完整的请求;因此,服务器应缓冲部分请求,直到请求完成。我将此修复留给读者作为练习。

WordGame 如何移植到 polling_server?与 threading_server 一样,此服务器必须确保每个客户端一个 WordGame 实例,并且必须协调客户端对其 WordGame 的访问。从好的方面来说,polling_server 是单线程的,因此是线程安全的。与 forking_server 不同,polling_server 不会产生 fork 子进程之间的上下文切换成本。

结论

哪种是客户端并发的最佳方式?合理的答案必须考虑传统的多进程和多线程,以及它们的混合。epoll 例证的事件驱动 I/O方式也值得研究。最终,选择的方法必须满足在真实世界的条件下支持真实世界 Web 应用程序的并发挑战。

资源

三个 Web 服务器以及 iterative_server 可在 http://condor.depaul.edu/mkalin 获取。

有关 Node.js 的更多信息,请参阅:https://node.org.cn

加载 Disqus 评论