Reactor模型通过事件循环监听多个socket,使用epoll实现I/O多路复用,核心组件包括事件分发器、反应器和事件处理器,结合示例展示了高并发服务器中事件注册与回调处理机制。

在C++网络编程中,Reactor模型是一种高效的I/O多路复用设计模式,适用于高并发的服务器程序。它通过一个事件循环监听多个文件描述符(如socket),当某个描述符就绪时(可读、可写或出错),触发对应的事件处理器进行处理。
Reactor模型核心组件
一个典型的Reactor模型包含以下几个关键部分:
- EventDemultiplexer(事件多路分发器):通常是epoll(Linux)、kqueue(BSD/macOS)或select/poll,用于监听多个socket的事件。
- Reactor(反应器):运行事件循环,调用事件分发器等待事件,并将就绪的事件分发给注册的事件处理器。
-
EventHandler(事件处理器):用户定义的回调对象,包含
handle_read、handle_write等接口,由Reactor在事件发生时调用。 - ConcreteEventHandler:具体实现,比如处理客户端连接、接收数据等。
使用epoll实现简易Reactor
下面是一个基于Linux epoll 的简化版Reactor模型实现,展示基本结构和流程。
#include// reactor.cpp#include #include #include class Reactor { public: using EventCallback = std::function ; Reactor(); ~Reactor(); // 注册可读事件 void register_read(int fd, EventCallback cb); // 注册可写事件 void register_write(int fd, EventCallback cb); // 移除事件 void remove(int fd); // 事件循环 void run(); private: int epoll_fd_; std::unordered_map read_callbacks_; std::unordered_map write_callbacks_; };
#include "reactor.h" #include#include #include Reactor::Reactor() { epoll_fd_ = epoll_create1(0); if (epoll_fd_ == -1) { perror("epoll_create1"); exit(1); } } Reactor::~Reactor() { close(epoll_fd_); } void Reactor::register_read(int fd, EventCallback cb) { read_callbacks_[fd] = cb; struct epoll_event ev; ev.events = EPOLLIN | EPOLLET; // 边缘触发 ev.data.fd = fd; epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev); } void Reactor::register_write(int fd, EventCallback cb) { write_callbacks_[fd] = cb; struct epoll_event ev; ev.events = EPOLLOUT | EPOLLET; ev.data.fd = fd; epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev); } void Reactor::remove(int fd) { read_callbacks_.erase(fd); write_callbacks_.erase(fd); epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr); close(fd); } void Reactor::run() { const int MAX_EVENTS = 10; struct epoll_event events[MAX_EVENTS]; while (true) { int n = epoll_wait(epoll_fd_, events, MAX_EVENTS, -1); if (n == -1) { perror("epoll_wait"); break; } for (int i = 0; i < n; ++i) { int fd = events[i].data.fd; uint32_t events_mask = events[i].events; if (events_mask & EPOLLIN) { auto it = read_callbacks_.find(fd); if (it != read_callbacks_.end()) { it->second(fd); // 调用读回调 } } if (events_mask & EPOLLOUT) { auto it = write_callbacks_.find(fd); if (it != write_callbacks_.end()) { it->second(fd); // 调用写回调 } // 写事件通常处理完后移除,避免反复触发 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr); write_callbacks_.erase(fd); } if (events_mask & (EPOLLERR | EPOLLHUP)) { remove(fd); } } } }
简单TCP服务器示例
使用上面的Reactor创建一个回显服务器:
立即学习“C++免费学习笔记(深入)”;
#include "reactor.h" #include#include #include int create_server_socket(int port) { int sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); if (sockfd == -1) { perror("socket"); exit(1); } int opt = 1; setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_addr.s_addr = INADDR_ANY; addr.sin_port = htons(port); if (bind(sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { perror("bind"); exit(1); } if (listen(sockfd, SOMAXCONN) == -1) { perror("listen"); exit(1); } return sockfd; } int main() { Reactor reactor; int server_fd = create_server_socket(8888); // 监听新连接 reactor.register_read(server_fd, [&reactor](int fd) { while (true) { sockaddr_in client_addr{}; socklen_t len = sizeof(client_addr); int client_fd = accept4(fd, (struct sockaddr*)&client_addr, &len, SOCK_NONBLOCK); if (client_fd == -1) break; printf("Client connected: %d\n", client_fd); // 注册客户端可读事件 reactor.register_read(client_fd, [&reactor](int cfd) { char buf[1024]; ssize_t n = read(cfd, buf, sizeof(buf)); if (n > 0) { write(cfd, buf, n); // 回显 } else { reactor.remove(cfd); // 连接关闭或出错 } }); } }); printf("Server running on port 8888...\n"); reactor.run(); return 0; }
关键点说明
这个实现展示了Reactor的基本思想,实际生产中还需考虑以下几点:
- 使用边缘触发(ET)模式时,必须一次性读完所有数据,否则可能丢失事件。
- 需要管理缓冲区,支持非阻塞读写。
- 可以引入Channel类封装fd和事件,提高代码可维护性。
- 添加定时器机制,支持超时处理。
- 使用线程池处理耗时操作,避免阻塞事件循环。











