Reactor模型通过事件循环监听多个socket,使用epoll实现I/O多路复用,核心组件包括事件分发器、反应器和事件处理器,结合示例展示了高并发服务器中事件注册与回调处理机制。

在C++网络编程中,Reactor模型是一种高效的I/O多路复用设计模式,适用于高并发的服务器程序。它通过一个事件循环监听多个文件描述符(如socket),当某个描述符就绪时(可读、可写或出错),触发对应的事件处理器进行处理。
Reactor模型核心组件
一个典型的Reactor模型包含以下几个关键部分:
- EventDemultiplexer(事件多路分发器):通常是epoll(Linux)、kqueue(BSD/macOS)或select/poll,用于监听多个socket的事件。
- Reactor(反应器):运行事件循环,调用事件分发器等待事件,并将就绪的事件分发给注册的事件处理器。
-
EventHandler(事件处理器):用户定义的回调对象,包含
handle_read、handle_write等接口,由Reactor在事件发生时调用。 - ConcreteEventHandler:具体实现,比如处理客户端连接、接收数据等。
使用epoll实现简易Reactor
下面是一个基于Linux epoll 的简化版Reactor模型实现,展示基本结构和流程。
#include <sys/epoll.h>
#include <functional>
#include <unordered_map>
#include <vector>
class Reactor {
public:
using EventCallback = std::function<void(int)>;
Reactor();
~Reactor();
// 注册可读事件
void register_read(int fd, EventCallback cb);
// 注册可写事件
void register_write(int fd, EventCallback cb);
// 移除事件
void remove(int fd);
// 事件循环
void run();
private:
int epoll_fd_;
std::unordered_map<int, EventCallback> read_callbacks_;
std::unordered_map<int, EventCallback> write_callbacks_;
};
// reactor.cpp
#include "reactor.h"
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
Reactor::Reactor() {
epoll_fd_ = epoll_create1(0);
if (epoll_fd_ == -1) {
perror("epoll_create1");
exit(1);
}
}
Reactor::~Reactor() {
close(epoll_fd_);
}
void Reactor::register_read(int fd, EventCallback cb) {
read_callbacks_[fd] = cb;
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 边缘触发
ev.data.fd = fd;
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
}
void Reactor::register_write(int fd, EventCallback cb) {
write_callbacks_[fd] = cb;
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLET;
ev.data.fd = fd;
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev);
}
void Reactor::remove(int fd) {
read_callbacks_.erase(fd);
write_callbacks_.erase(fd);
epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
}
void Reactor::run() {
const int MAX_EVENTS = 10;
struct epoll_event events[MAX_EVENTS];
while (true) {
int n = epoll_wait(epoll_fd_, events, MAX_EVENTS, -1);
if (n == -1) {
perror("epoll_wait");
break;
}
for (int i = 0; i < n; ++i) {
int fd = events[i].data.fd;
uint32_t events_mask = events[i].events;
if (events_mask & EPOLLIN) {
auto it = read_callbacks_.find(fd);
if (it != read_callbacks_.end()) {
it->second(fd); // 调用读回调
}
}
if (events_mask & EPOLLOUT) {
auto it = write_callbacks_.find(fd);
if (it != write_callbacks_.end()) {
it->second(fd); // 调用写回调
}
// 写事件通常处理完后移除,避免反复触发
epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
write_callbacks_.erase(fd);
}
if (events_mask & (EPOLLERR | EPOLLHUP)) {
remove(fd);
}
}
}
}
简单TCP服务器示例
使用上面的Reactor创建一个回显服务器:
立即学习“C++免费学习笔记(深入)”;
#include "reactor.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <cstring>
int create_server_socket(int port) {
int sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (sockfd == -1) {
perror("socket");
exit(1);
}
int opt = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
perror("bind");
exit(1);
}
if (listen(sockfd, SOMAXCONN) == -1) {
perror("listen");
exit(1);
}
return sockfd;
}
int main() {
Reactor reactor;
int server_fd = create_server_socket(8888);
// 监听新连接
reactor.register_read(server_fd, [&reactor](int fd) {
while (true) {
sockaddr_in client_addr{};
socklen_t len = sizeof(client_addr);
int client_fd = accept4(fd, (struct sockaddr*)&client_addr, &len, SOCK_NONBLOCK);
if (client_fd == -1) break;
printf("Client connected: %d\n", client_fd);
// 注册客户端可读事件
reactor.register_read(client_fd, [&reactor](int cfd) {
char buf[1024];
ssize_t n = read(cfd, buf, sizeof(buf));
if (n > 0) {
write(cfd, buf, n); // 回显
} else {
reactor.remove(cfd); // 连接关闭或出错
}
});
}
});
printf("Server running on port 8888...\n");
reactor.run();
return 0;
}
关键点说明
这个实现展示了Reactor的基本思想,实际生产中还需考虑以下几点:
- 使用边缘触发(ET)模式时,必须一次性读完所有数据,否则可能丢失事件。
- 需要管理缓冲区,支持非阻塞读写。
- 可以引入Channel类封装fd和事件,提高代码可维护性。
- 添加定时器机制,支持超时处理。
- 使用线程池处理耗时操作,避免阻塞事件循环。











