KV存储是一种常见的数据存储模型,广泛应用于缓存、配置管理、计数器、队列以及分布式锁等场景。例如,Redis 就是典型的 KV 存储系统,常用于网络服务中的高速缓存。
本项目实现了一个轻量级的远程 KV 存储服务,功能类似于 Redis,支持基本的数据读写操作,并具备良好的扩展性与高性能处理能力。
编程环境:
部署于 2G 内存、2 核 CPU 的 Linux 云服务器,通过 Xshell 与 VSCode 进行远程连接开发,使用 Makefile 构建整个项目。
技术选型:
C/C++ 基础语言,结合 C++11 特性(如智能指针、lambda 表达式、function 函数包装器),采用 Socket 网络编程,基于 epoll 的 ET 模式配合 Reactor 设计模式,整体架构中引入策略模式,底层涉及数组、哈希表、红黑树和链表等多种基础数据结构。
项目遵循高内聚、低耦合的设计原则,将系统划分为多个独立模块,便于维护与扩展。
负责管理客户端的并发连接,接收来自客户端的请求并转交给协议解析模块,同时将处理结果回传给客户端。该模块基于 epoll + ET(边缘触发)模式,结合 Reactor 模式实现高效的 I/O 多路复用机制。每个文件描述符(fd)均注册了对应的回调函数,以便在事件就绪时快速响应。
该模块依据自定义通信协议对接收到的原始数据进行解析,验证请求合法性,对符合格式的数据进行反序列化,生成标准请求对象后传递给数据存储引擎模块。当存储层返回结果后,再将其封装并通过网络模块发送回客户端。
根据上层解析模块传来的指令执行具体操作。当前支持多种数据结构作为底层存储引擎,包括数组、哈希表、红黑树及 LRU 缓存机制。通过策略模式,在服务启动时可动态选择合适的存储方案以适应不同应用场景。
支持的核心命令如下:
对原始 Socket 接口进行了封装,提升代码可读性和复用性。主要封装了 socket、bind、listen 和 accept 等系统调用。
由于项目采用 epoll 的 ET 模式,因此在创建监听套接字 listenfd 时需设置为非阻塞模式(SOCK_NONBLOCK),避免因频繁唤醒导致性能下降。此外,ET 模式下必须正确处理 EAGAIN 或 EINTR 错误码,故 accept 操作需返回错误状态供上层判断。
#pragma once
#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <cstring>
const int gbacklog = 128;
class mySocket
{
public:
// 1.构建tcp socketfd
static int creatSockfd()
{
// 创建socketfd
int sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
return sockfd;
}
// 2.bind绑定端口
static void Bind(int sockfd, int port)
{
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(serveraddr));
// 设置地址的信息(协议,ip,端口)
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 绑定任意网卡ip,通常我们访问某一个IP地址是这个服务器的公网网卡IP地址
serveraddr.sin_port = htons(port); // 注意端口16位,2字节需要使用htons。不可使用htonl
if (bind(sockfd, (const sockaddr *)(&serveraddr), sizeof(serveraddr)) < 0)
{
perror("sock bind err");
exit(-1);
}
std::cout << "sock bind success" << std::endl;
}
// 3. listen监听,让打开的sock这个"文件"去监听来自网络的请求。用于获取新的网络连接
static void Listen(int sockfd, int maxaccept)
{
if (listen(sockfd, maxaccept) == -1)
{
perror("sock listen err");
exit(-1);
}
std::cout << "sock listen success" << std::endl;
}
// 4 accept创建sockfd用于传输数据
static int Accept(int listenfd, std::string &clientIp, uint16_t &clientPort, int &err)
{
// 获取新fd用于通信
struct sockaddr_in clientaddr;
memset(&clientaddr, 0, sizeof(clientaddr));
socklen_t len = sizeof(clientaddr);
// std::cout << "accept start " << listenfd << std::endl;
int sockfd = accept(listenfd, (struct sockaddr *)&clientaddr, &len);
// 需要处理错误 EAGAIN 和 EINTER
err = errno;
clientIp = inet_ntoa(clientaddr.sin_addr);
clientPort = ntohs(clientaddr.sin_port);
return sockfd;
}
};
用于管理每一个连接的 fd 及其关联的读写回调函数,同时维护每个连接的输入输出缓冲区(inBuffer/outBuffer)。该结构体充当了网络层与协议解析层之间的桥梁。
当某个 fd 被 epoll 检测到就绪后,可通过查找对应 connItem 实例,调用已注册的 reader、sender 或 accepter 方法进行处理。
#pragma once
#include <string>
#include <functional>
// 管理网络连接信息和缓冲区的结构体
class tcpServer;
struct connItem;
using func_t = std::function<void(connItem *)>; // 使用函数包装器,当然也可以使用函数指针
// typedef void (*func_t)(connItem *);
struct connItem
{
// 构造函数
connItem(int sockfd = -1, tcpServer *tcsvptr = nullptr)
: _sockfd(sockfd), _tcsvptr(tcsvptr) {}
// 用于注册该连接的对应的读写异常回调方法
void Register(func_t recver = nullptr, func_t sender = nullptr, func_t execpter = nullptr)
{
_recver = recver;
_sender = sender;
_execpter = execpter;
}
// 文件描述符和读写缓冲区
std::string _inbuffer;
std::string _outbuffer;
// 这个连接对应的读写异常方法
func_t _recver;
func_t _sender;
func_t _execpter;
// 执行服务器的回调指针
tcpServer *_tcsvptr;
int _sockfd;
};
TcpServer 是整个服务器的核心控制类,包含以下关键成员变量:
// 如果要同时监听多个端口,就需要维护每一个sockfd与对应端口的信息
std::unordered_map<int, int> _listensock_fds;
int _epfd; // epollfd
epoll_event *_revents; // 返回事件的列表
std::unordered_map<int, connItem *> _connlist{}; // 用于快速查找fd和对于的连接结构体conn
func_t _service = kvstoreTask; // 处理kv请求和响应的函数
初始化相关函数包括 addListenPort 与 init:
// 增加监听的端口
void addListenPort(int port)
{
// 服务器初始化,在creat中已经设置为非阻塞了
int listensock = mySocket::creatSockfd();
// 设置端口复用,保证服务器退出后能够快速bind
int opt = 1;
setsockopt(listensock, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
mySocket::Bind(listensock, port);
mySocket::Listen(listensock, gbacklog);
// 将listensock 与 对应端口添加到哈希表中
_listensock_fds.insert(std::make_pair(listensock, port));
}
// 初始化到监听即可
void init()
{
// epoll初始化,返回事件集合初始化
_epfd = epoll_create(1);
_revents = new struct epoll_event[fdnums];
// 遍历哈希表,将所有的listenfd 与 port进行 关心
if (_listensock_fds.empty())
{
std::cerr << "没有设置监听端口!" << std::endl;
exit(-1);
}
for (auto &kv : _listensock_fds)
{
AddConnList(kv.first, EPOLLIN | EPOLLET, [this](connItem *conn)
{ this->Accepter(conn); }, nullptr, nullptr);
}
}
事件派发主循环:
// 事件派发器
void Dispatcher()
{
printf("Dispatcher start\n");
while (true)
{
int n = epoll_wait(_epfd, _revents, fdnums, -1);
// 遍历就绪队列,epoll只会返回真的就绪的事件。不会返回无效事件,减少遍历
for (int i = 0; i < n; i++)
{
int connfd = _revents[i].data.fd;
uint32_t events = _revents[i].events;
// 哈希表中该连接没有删除
if (_connlist.count(connfd))
{
// 不要使用if elseif else,因为同一个事件有可能读写事件都就绪了
if ((events & EPOLLIN) && _connlist[connfd]->_recver != nullptr) // 回调执行fd对应读事件
_connlist[connfd]->_recver(_connlist[connfd]);
if ((events & EPOLLOUT) && _connlist[connfd]->_sender != nullptr) // 回调执行fd对应写事件
_connlist[connfd]->_sender(_connlist[connfd]);
}
}
}
printf("Dispatcher over\n");
}
这是服务器的主运行循环,持续调用 epoll_wait 获取就绪事件。对于每一个返回的 fd,先在哈希表中查找是否存在对应连接记录;若存在,则调用其 connItem 中预设的方法——Accepter 处理新连接,Recver 负责接收数据,Sender 发送响应数据。
在每个方法的实现中,必须对 EAGAIN 和 EINTR 异常进行处理。当遇到 EAGAIN 时,应直接跳出当前流程;而若捕获到 EINTR,则需重新尝试操作,即执行 continue 操作。
在 Reader 方法中,完成数据读取后需要调用 _service(conn),将数据交由解析层进行后续处理。发送数据(send)完成后,需重新注册该连接对应的事件监听。读事件始终需要关注,而是否关注写事件则取决于输出缓冲区中是否存在待发送的数据。
// listenfd 触发EOILLIN执行
void Accepter(connItem *conn)
{
// 1.获取新连接的fd,注意ET模式下,需要死循环一次性将所有数据读取完毕。否则会出现问题
// printf("Accepter start\n");
while (true)
{
std::string clientip;
uint16_t clientport;
int err = 0;
int clientsock = mySocket::Accept(conn->_sockfd, clientip, clientport, err);
// 2.构建新连接的信息表,让epoll关心该事件同时并通过哈希表进行管理。需要使用lambda进行处理类内回调函数
if (clientsock > 0)
{
AddConnList(clientsock, EPOLLIN | EPOLLET, [this](connItem *conn)
{ this->Reader(conn); }, [this](connItem *conn)
{ this->Sender(conn); }, nullptr); // 这里暂时不处理异常事件
// 这里可以给每一个连接客户端发送一份使用说明
printf("Get a new link, info [%s:%d] clientsock[%d]\n", clientip.data(), clientport, clientsock);
}
else
{
// 处理EAGAIN等异常信号
if (err == EAGAIN || err == EWOULDBLOCK)
{
// 没有连接了
// printf("DEBUG Accepter EAGAIN 没有更多连接,此次获取连接结束\n");
break;
}
else if (err == EINTR)
{
// printf("DEBUG Accepter EINTR 还有更多连接需要处理\n");
continue;
}
else
{
// printf("ERRNO Accepter 建立连接失败\n");
break;
}
}
}
// printf("Accepter over\n");
}
// clientfd触发EOILLIN执行
void Reader(connItem *conn)
{
char buffer[1024];
while (true)
{
int count = recv(conn->_sockfd, buffer, sizeof(buffer) - 1, 0);
if (count > 0)
{
buffer[count] = 0;
conn->_inbuffer += buffer;
}
else if (count == 0)
{
// printf("DEBUG Reader recv over\n");
RemoveConn(conn->_sockfd);
return;
}
else
{
// 同理需要处理EAGAIN和EINTER
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// 无数据可读
// printf("DEBUG Reader EAGAIN\n");
break;
}
else if (errno == EINTR)
{
// printf("DEBUG Reader EINTR\n");
continue;
}
else
{
// printf("ERRNO Reader 建立连接失败\n");
// 关闭套接字和取消epoll关心,然后退出
RemoveConn(conn->_sockfd);
return;
}
}
}
// 接收数据之后,进行解析处理。这里目前只是简单处理,还能进一步优化
// printf("处理客户端请求开始\n");
_service(conn);
// printf("处理客户端请求结束\n");
}
// clientfd触发EOILLOUT执行
void Sender(connItem *conn)
{
while (true)
{
int count = send(conn->_sockfd, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);
if (count > 0)
{
// 清空发送的数据,可以进一步优化
conn->_outbuffer.erase(0, count);
// 数据发送完毕
if (conn->_outbuffer.empty())
{
// 此时不可以直接更改事件的关系,因为数据可能还在内核,没有发送到网络
// printf("DEBUG Senderr send over\n");
break;
}
}
else if (count == 0)
{
// 没有数据发送
RemoveConn(conn->_sockfd);
return;
}
else
{
// 同理需要处理EAGAIN和EINTER
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
else if (errno == EINTR)
continue;
else
{
// 关闭套接字和取消epoll关心,然后退出
RemoveConn(conn->_sockfd);
return;
}
}
}
// 到这里,事件结束了,数据才是真的发送出去了通重新关心该事件的读写
if (conn->_outbuffer.empty())
SetEvent(conn->_sockfd, EPOLLIN | EPOLLET, EVENT_MOD);
else
SetEvent(conn->_sockfd, EPOLLET | EPOLLIN | EPOLLOUT, EVENT_MOD);
}
AddConnList:用于初始化连接对象 conn,注册其事件回调,并将其插入哈希表中以便统一管理。
SetEvent:通过 epoll_ctl 接口来添加、修改或删除指定文件描述符(fd)所关注的事件类型,如 EPOLLIN 或 EPOLLOUT。
RemoveConn:负责销毁与连接相关的资源,包括关闭文件描述符和释放内存(close 和 delete)。
// 初始化连接信息和方法,注册到epoll关心列表中,并放入连接信息哈希表中
void AddConnList(int sockfd, uint32_t event, func_t reader, func_t sender, func_t execpter)
{
// ET模式下,将fd设置为非阻塞
int n = SetNonBlock(sockfd);
if (n < 0)
{
printf("SetNonBlock 失败!\n");
exit(-1);
}
// 1.构建连接信息,并注册方法
connItem *conn = new connItem(sockfd, this);
conn->Register(reader, sender, execpter);
// 2.让epoll关心该事件
SetEvent(sockfd, event, EVENT_ADD);
// 3.放入连接信息哈希表中
_connlist.insert(std::make_pair(sockfd, conn));
}
void SetEvent(int sockfd, uint32_t event, int flag)
{
struct epoll_event ev;
ev.data.fd = sockfd;
ev.events = event;
if (flag == EVENT_ADD) // 新增
epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);
else if (flag == EVENT_MOD) // 修改
epoll_ctl(_epfd, EPOLL_CTL_MOD, sockfd, &ev);
else if (flag == EVENT_DEL) // 删除
epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, 0);
}
void RemoveConn(int sockfd)
{
auto it = _connlist.find(sockfd);
if (it != _connlist.end())
{
// 从epoll中删除
SetEvent(sockfd, 0, EVENT_DEL);
// 关闭socket
close(sockfd);
// 释放内存
delete it->second;
// 从哈希表删除
_connlist.erase(it);
// printf("连接关闭: fd=%d, 剩余连接数: %zu\n", sockfd, _connlist.size());
}
}
serverEntry.cc
该文件是整个服务器程序的入口点。如果未来希望切换至其他网络编程模型(例如协程或异步 I/O 框架),只需将项目的主入口替换为目标框架所提供的启动方式即可,无需大规模重构业务逻辑。
a. Task.h / Task.cc
#pragma once
// 这里使用自定制协议
// 处理请求,发送响应
class connItem;
void kvstoreTask(connItem *conn);
#include "Task.h"
#include "Protocol.h"
#include "../reactorTcpServer/connItem.hpp" //解析模块与网络模块的交互
#include "../storage/commandExecutor.h" //解析模块与存储模块的交互
// 这里使用自定制协议
// 处理请求,发送响应
void kvstoreTask(connItem *conn)
{
printf("recv :%s\n", conn->_inbuffer.c_str());
// 接收并解析数据,构建请求完整报文。对于非法数据,啥处理都不做
std::string onePackage;
while (ParseOnePackage(conn->_inbuffer, onePackage))
{
// TCP数据会有粘包,所以要解析出一个报文进行处理
// 序列化建请求报文
kvstoreRequest req;
std::string ans;
if (!req.deserialize(onePackage))
{
// 数据错误,解析失败
ans = "ERROR please input:\nSET KEY VALUE\nGET KEY\nDEL KEY\nMOD KEY VALUE\nSIZE\n";
// 发送错误应答,解析下一个报文
conn->_outbuffer = ans;
conn->_sender(conn);
continue;
}
// 可以成功处理请求,解析层保证传入的数据是对的
ans = globalExecutor.execute(req);
// 根据请求,构建响应报文
// printf("ans : %s\n", ans);
// 发送响应
conn->_outbuffer = ans;
conn->_sender(conn);
}
}
此部分是网络层、解析层与存储层之间交互的核心区域。网络层通过 kvstoreTask 提交接收到的数据包,解析层完成报文解析后,将封装好的请求提交给存储层的 globalExecutor 执行具体操作,最终将响应结果返回至网络层,由其发送回客户端。
b. protocol.h / protocol.cc
.h 文件内容
#pragma once
#include <string>
// 解析一份报文
bool ParseOnePackage(std::string &inbuffer, std::string &rcv_text);
// 判断报文是否合法
bool isValidCommand(const std::string &cmd);
// kvstroe请求报文
class kvstoreRequest
{
public:
// 序列化请求报文
void serialize();
// 反序列化请求报文,将一个报文数据反序string列化
bool deserialize(const std::string &onePacPage);
// 判断字符串是否非法,最好在创建时候验证,减少检测消耗
bool isValid();
public:
const std::string &getop() const;
const std::string &getkey() const;
const std::string &getvalue() const;
private:
std::string op;
std::string key;
std::string value;
};
// kvstroe响应报文
class kvstroeResponse
{
public:
// 序列化请求报文
void serialize();
// 反序列化请求报文
void deserialize();
private:
};
主要定义了请求与响应的结构体,包含序列化与反序列化的接口。同时提供了 ParseOnePackage 函数,用于从字节流中提取一个完整的数据包。由于本项目返回值较为简单(如 OK、ERROR、NO EXIST 等字符串),因此未对响应报文做复杂设计。
protocol.cc 实现细节
#include "Protocol.h"
#include <vector>
#include <iostream>
#include <sstream>
// 解析报文
// 将收到的数据inbuffer中解析为一个个报文
// client -> server
// set key value
// get key
// del key
// mody key value
bool ParseOnePackage(std::string &inbuffer, std::string &rcv_text)
{
#if 1
// 接收数据为空,直接返回
if (inbuffer.empty())
return false;
// 清空上一次的报文
rcv_text.clear();
// 开始解析报文,这里先直接简单处理
rcv_text = inbuffer;
inbuffer.clear();
return true;
#else
// 找到一个报文的\r\n
auto pos = inbuffer.find("\r\n");
if (pos == std::string::npos)
return false;
// 清空之前报文,获取一份新报文
rcv_text.clear();
rcv_text = inbuffer.substr(0, pos);
// 删除缓冲区取出的数据
inbuffer.erase(0, pos + 2);
// 检测报文是否合法
return isValidCommand(rcv_text);
#endif
}
// 判断接收的报文是否合法
bool isValidCommand(const std::string &cmd)
{
// 检测报文是否为空
if (cmd.empty())
return false;
return cmd.find("SET ") == 0 ||
cmd.find("GET ") == 0 ||
cmd.find("DEL ") == 0 ||
cmd.find("MOD ") == 0 ||
cmd.find("SIZE") == 0;
}
//--------------------------------------------Request--------------------------------------------
//--------------------------------------------Request--------------------------------------------
// 序列化请求报文
void kvstoreRequest::serialize() {}
// 反序列化请求报文,将一个报文数据反序string列化
bool kvstoreRequest::deserialize(const std::string &onePacPage)
{
std::vector<std::string> tokens;
std::string token;
// set key value
std::stringstream ss(onePacPage);
while (ss >> token)
tokens.emplace_back(token);
if (tokens.size() > 3)
return false;
op = tokens[0];
key = tokens.size() > 1 ? tokens[1] : "";
value = tokens.size() > 2 ? tokens[2] : "";
// std::cout << "序列化数据为:" << op << " " << key << " " << value << std::endl;
return isValid();
}
// 判断字符串是否非法,最好在创建时候验证,减少检测消耗
bool kvstoreRequest::isValid()
{
if (op == "SIZE" && key.empty() && value.empty())
return true;
if (key.empty())
return false;
if (op == "SET" || op == "MOD")
return !value.empty();
else if (op == "GET" || op == "DEL")
return value.empty(); // 防止 命令 GET key value 和 DEL key value
return false;
}
const std::string &kvstoreRequest::getop() const { return op; }
const std::string &kvstoreRequest::getkey() const { return key; }
const std::string &kvstoreRequest::getvalue() const { return value; }
//--------------------------------------------Request--------------------------------------------
//--------------------------------------------Request--------------------------------------------
//--------------------------------------------Response--------------------------------------------
//--------------------------------------------Response--------------------------------------------
// 序列化响应报文
void kvstroeResponse::serialize()
{
}
// 反序列化响应报文
void kvstroeResponse::deserialize()
{
}
//--------------------------------------------Response--------------------------------------------
//--------------------------------------------Response--------------------------------------------
重点关注 ParseOnePackage 函数,其实现支持两种报文分割方式:一种为不作特殊处理,另一种使用分隔符 \r\n 来界定报文边界。采用分隔符可有效缓解 TCP 粘包问题,尽管更优方案是在每个报文前附加长度字段加标识符的方式。
请求的反序列化过程旨在从原始数据中提取出操作类型(op)、键(key)和值(value),然后提交至存储层执行。对于格式错误的请求,函数应返回 false,并由 kvstoreTask 进行相应处理。
a. commandExecutor.h / commandExecutor.cc
该模块负责接收来自解析层的请求 req,并调度执行相应的命令,如 SET、DEL、GET、MOD、SIZE 等。
#pragma once
#include "../protocol/Protocol.h"
#include "kvStorages.h"
#include <unordered_map>
#include <functional>
#include <memory>
class commandExecutor
{
public:
// 构造函数,用于注册方法
commandExecutor();
// 设置存储引擎
void setStorage(std::unique_ptr<kvStorags> kvStoragePtr);
// 执行函数,根据传入的数据执行方法表相应的方法
std::string execute(const kvstoreRequest &req);
private:
// 注册方法表
void registerCommands();
private:
std::unique_ptr<kvStorags> _kvStoragePtr;
std::unordered_map<std::string, std::function<std::string(const kvstoreRequest &)>> _cmds;
};
// 声明全局的存储引擎和执行器
extern commandExecutor globalExecutor;
关键成员说明:
#include "commandExecutor.h"
// 定义全局存储引擎
commandExecutor globalExecutor;
// 构造函数,用于注册方法,默认使用哈希
commandExecutor::commandExecutor()
: _kvStoragePtr(new HashStorage())
{
registerCommands();
}
// 设置存储引擎
void commandExecutor::setStorage(std::unique_ptr<kvStorags> kvStoragePtr)
{
// 注意 unique_ptr是独占智能指针,转移管理权必须使用 std::move
_kvStoragePtr = std::move(kvStoragePtr);
}
void commandExecutor::registerCommands()
{
// 注册方法列表
_cmds["SET"] = [this](const kvstoreRequest &req)
{ return _kvStoragePtr->SET(req.getkey(), req.getvalue()) ? "OK" : "SET FAILED"; };
_cmds["GET"] = [this](const kvstoreRequest &req)
{
const std::string &value = _kvStoragePtr->GET(req.getkey());
return value.empty() ? "NO EXIST" : value;
};
_cmds["DEL"] = [this](const kvstoreRequest &req)
{ return _kvStoragePtr->DEL(req.getkey()) ? "OK" : "NO EXIST"; };
_cmds["MOD"] = [this](const kvstoreRequest &req)
{ return _kvStoragePtr->MOD(req.getkey(), req.getvalue()) ? "OK" : "NO EXIST"; };
_cmds["SIZE"] = [this](const kvstoreRequest &req)
{ return std::to_string(_kvStoragePtr->SIZE()); };
}
// 执行函数,根据传入的数据执行相应的方法
std::string commandExecutor::execute(const kvstoreRequest &req)
{
auto it = _cmds.find(req.getop());
// 解析模块保证传输数据是有效的
// if (it == _cmds.end())
// return "cmd error please input SET GET MOD DEL";
return it->second(req);
}
b. storage.h / storage.cc
这是实际的数据存储实现部分,采用策略模式进行设计。定义了一个抽象基类 storage,所有具体存储类型(如 array、hash、rbtree、lrucache)均继承自该基类,并重写虚函数以实现各自特有的数据操作逻辑。
#pragma once
#include <vector>
#include <list>
#include <map>
#include <unordered_map>
// 根据用户需求,选择不同的存储数据结构。采用策略模式
// 方案有 哈希 红黑树 数组 跳表 LRUCache
class kvStorags
{
public:
// 策略模式,保证基类析构函数是虚函数
virtual ~kvStorags() = default;
// 四种方法的操作定义
virtual bool SET(const std::string &key, const std::string &value) = 0;
virtual std::string GET(const std::string &key) = 0;
virtual bool DEL(const std::string &key) = 0;
virtual bool MOD(const std::string &key, const std::string &value) = 0;
virtual size_t SIZE() const = 0;
};
// 用于存储数据的类,还需要考虑线程安全的问题
class RBTreeStorage : public kvStorags
{
public:
// 四种方法的操作定义
virtual bool SET(const std::string &key, const std::string &value) override;
virtual std::string GET(const std::string &key) override;
virtual bool DEL(const std::string &key) override;
virtual bool MOD(const std::string &key, const std::string &value) override;
virtual size_t SIZE() const;
private:
std::map<std::string, std::string> _storage{}; // 红黑树存储
};
// 用于存储数据的类,还需要考虑线程安全的问题
class HashStorage : public kvStorags
{
public:
// 四种方法的操作定义
virtual bool SET(const std::string &key, const std::string &value) override;
virtual std::string GET(const std::string &key) override;
virtual bool DEL(const std::string &key) override;
virtual bool MOD(const std::string &key, const std::string &value) override;
virtual size_t SIZE() const;
private:
std::unordered_map<std::string, std::string> _storage{10000}; // 哈希存储,预分配空间减少哈希冲突;
};
// 用于存储数据的类,还需要考虑线程安全的问题
class ArrayStorage : public kvStorags
{
public:
// 四种方法的操作定义
virtual bool SET(const std::string &key, const std::string &value) override;
virtual std::string GET(const std::string &key) override;
virtual bool DEL(const std::string &key) override;
virtual bool MOD(const std::string &key, const std::string &value) override;
virtual size_t SIZE() const;
private:
std::vector<std::pair<std::string, std::string>> _storage{10000};
};
// 用于存储数据的类,还需要考虑线程安全的问题
class LRUCacheStorage : public kvStorags
{
public:
LRUCacheStorage() : _capacity(10000) {}
// 四种方法的操作定义
virtual bool SET(const std::string &key, const std::string &value) override;
virtual std::string GET(const std::string &key) override;
virtual bool DEL(const std::string &key) override;
virtual bool MOD(const std::string &key, const std::string &value) override;
virtual size_t SIZE() const;
private:
// 需要一个迭代器
using iter = std::list<std::pair<std::string, std::string>>::iterator;
int _capacity;
std::list<std::pair<std::string, std::string>> _LRUList;
std::unordered_map<std::string, iter> _hashmap;
};
storage.cc 文件中包含了基本的增删改查操作实现,逻辑清晰,具体细节可参考代码仓库。
包含主函数 main,主要职责是配置默认存储引擎并启动服务器实例。
#include "storage/commandExecutor.h"
#include <cstring>
int serverEntry();
// 初始化存储引擎
void initEgineKvstore(const std::string &storage)
{
if (storage == "array")
globalExecutor.setStorage(std::make_unique<ArrayStorage>());
else if (storage == "rbtree")
globalExecutor.setStorage(std::make_unique<RBTreeStorage>());
else if (storage == "lru")
globalExecutor.setStorage(std::make_unique<LRUCacheStorage>());
else
{
printf("未选择或者错误选择存储引擎, 默认使用hash\n");
}
}
int main(int argc, char *argv[])
{
std::string storageType = "hash";
// 初始化存储引擎
for (int i = 1; i < argc; ++i)
{
if ((strcmp(argv[i], "--storage") == 0) && i + 1 < argc)
storageType = argv[++i];
else if (strcmp(argv[i], "--help") == 0)
{
printf("\r\n%s [--storage hash|rbtree|array|lru] [--help]\r\n\r\n", argv[0]);
printf("Default storage engine: hash\r\n\r\n");
return 0;
}
else
{
printf("\r\nUnknown option\r\n\r\n");
printf("KVStore Usage:\r\n%s [--storage hash|rbtree|array|lru] [--help]\r\n\r\n", argv[0]);
return -1;
}
}
printf("初始化存储引擎\n");
initEgineKvstore(storageType);
// 启动服务器,在这里可以选择不同的网络框架。
// 如果想要使用协程网络框架,直接调用Task.h中的kvstoreTask,然后执行相应的交互即可
printf("初始化服务器\n");
serverEntry();
}
测试环境为一台配置为 2核2G 内存的云服务器,操作系统为 CentOS 8。
项目构建完成后,需进行以下几项测试验证:
首先进行基础功能验证:


结果显示,整个操作流程执行正常,各命令返回符合预期,功能完整可用。
使用仓库中 Prestandatest 目录下的 qpsTest.cc 工具进行压测。建议选择 Hash 存储引擎,多次运行取平均值以提高准确性。
测试参数设定为:并发连接数 × 每个连接发送的请求数量(混合发送 SET 与 GET 请求)。
基础功能测试: 全部通过
=== 性能测试 ===
并发数: 100
每线程请求数: 5000 (SET+GET)
总请求数: 1000000
测试结果:
总耗时: 29423 ms
总请求: 1000000
成功请求: 1000000
失败请求: 0
成功率: 100%
QPS: 33987
基础功能测试: 全部通过
=== 性能测试 ===
并发数: 100
每线程请求数: 5000 (SET+GET)
总请求数: 1000000
测试结果:
总耗时: 28154 ms
总请求: 1000000
成功请求: 1000000
失败请求: 0
成功率: 100%
QPS: 35518.9
基础功能测试: 全部通过
=== 性能测试 ===
并发数: 200
每线程请求数: 2500 (SET+GET)
总请求数: 1000000
测试结果:
总耗时: 29177 ms
总请求: 1000000
成功请求: 1000000
失败请求: 0
成功率: 100%
QPS: 34273.6
=== 性能测试 ===
并发数: 50
每线程请求数: 10000 (SET+GET)
总请求数: 1000000
测试结果:
总耗时: 27165 ms
总请求: 1000000
成功请求: 1000000
失败请求: 0
成功率: 100%
QPS: 36812.1
=== 性能测试 ===
并发数: 100
每线程请求数: 50000 (SET+GET)
总请求数: 10000000
测试结果:
总耗时: 423004 ms
总请求: 10000000
成功请求: 10000000
失败请求: 0
成功率: 100%
QPS: 23640.4从测试结果可以看出,当总请求量达到100万时,系统的QPS均值约为35000;而当请求总量上升至1000万时,QPS下降至20000以上。推测性能下降的主要原因为数据规模增大导致哈希冲突频率上升,进而使得每次插入与查询操作的开销增加。
| 测试编号 | 并发数 | 每线程请求数 | 总请求数 | 总耗时(ms) | QPS | 成功率 | 备注 |
|---|---|---|---|---|---|---|---|
| 测试1 | 100 | 5,000 | 1,000,000 | 29,423 | 33,987 | 100% | 基准测试 |
| 测试2 | 100 | 5,000 | 1,000,000 | 28,154 | 35,518 | 100% | 基准测试 |
| 测试3 | 200 | 2,500 | 1,000,000 | 29,177 | 34,274 | 100% | 基准测试 |
| 测试4 | 50 | 10,000 | 1,000,000 | 27,165 | 36,812 | 100% | 最佳性能 |
| 测试5 | 100 | 50,000 | 10,000,000 | 423,004 | 23,640 | 100% | 压力测试 |
使用项目仓库中的 stressConnectionTest 工具进行测试。此前已多次执行相关实验,当前测试结果刚好接近服务器所能承受的最大负载状态。
启动连接压力测试...
存储引擎模式: Hash
=== 真实连接压力测试 ===
目标最大连接数: 30000
测试持续时间: 100 秒
服务器: 127.0.0.1:8080
测试模式: 建立连接 + 持续数据交互 (70% GET, 30% SET)
工作线程数: 50
每线程连接数: 600
[0s] 连接数: 0 | 成功: 0 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[2s] 连接数: 9821 | 成功: 9821 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[4s] 连接数: 14533 | 成功: 14533 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[6s] 连接数: 15257 | 成功: 15257 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[8s] 连接数: 15915 | 成功: 15915 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[10s] 连接数: 16599 | 成功: 16599 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[12s] 连接数: 17253 | 成功: 17253 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[14s] 连接数: 17899 | 成功: 17899 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[16s] 连接数: 18513 | 成功: 18513 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[18s] 连接数: 19130 | 成功: 19130 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[20s] 连接数: 19734 | 成功: 19734 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[22s] 连接数: 20313 | 成功: 20313 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[24s] 连接数: 20870 | 成功: 20870 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[26s] 连接数: 21446 | 成功: 21446 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[28s] 连接数: 22007 | 成功: 22007 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[30s] 连接数: 22521 | 成功: 22521 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[32s] 连接数: 23032 | 成功: 23032 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[34s] 连接数: 23542 | 成功: 23542 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[36s] 连接数: 24044 | 成功: 24044 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[38s] 连接数: 24538 | 成功: 24538 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[40s] 连接数: 25046 | 成功: 25046 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[42s] 连接数: 25519 | 成功: 25519 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[44s] 连接数: 26009 | 成功: 26009 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[46s] 连接数: 26509 | 成功: 26509 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[48s] 连接数: 26963 | 成功: 26963 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[50s] 连接数: 27417 | 成功: 27417 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[52s] 连接数: 27862 | 成功: 27862 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[54s] 连接数: 28232 | 成功: 28232 | 失败: 7 | 请求: 0 | 请求成功率: 0%
[56s] 连接数: 28232 | 成功: 28232 | 失败: 283 | 请求: 0 | 请求成功率: 0%
[58s] 连接数: 28232 | 成功: 28232 | 失败: 558 | 请求: 0 | 请求成功率: 0%
[60s] 连接数: 28232 | 成功: 28232 | 失败: 833 | 请求: 0 | 请求成功率: 0%
[62s] 连接数: 28232 | 成功: 28232 | 失败: 1102 | 请求: 0 | 请求成功率: 0%
[64s] 连接数: 28232 | 成功: 28232 | 失败: 1390 | 请求: 0 | 请求成功率: 0%
[66s] 连接数: 28232 | 成功: 28232 | 失败: 1666 | 请求: 9 | 请求成功率: 100%
[68s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 799 | 请求成功率: 100%
[70s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 1792 | 请求成功率: 100%
[72s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 2792 | 请求成功率: 100%
[74s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 3792 | 请求成功率: 100%
[76s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 4792 | 请求成功率: 100%
[78s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 5792 | 请求成功率: 100%
[80s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 6778 | 请求成功率: 100%
[82s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 7778 | 请求成功率: 100%
[84s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 8778 | 请求成功率: 100%
[86s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 9771 | 请求成功率: 100%
[88s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 10771 | 请求成功率: 100%
[90s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 11771 | 请求成功率: 100%
[92s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 12771 | 请求成功率: 100%
[94s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 13771 | 请求成功率: 100%
[96s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 14767 | 请求成功率: 100%
[98s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 15767 | 请求成功率: 100%
[100s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 16752 | 请求成功率: 100%
=== 压力测试结果 ===
总耗时: 102293 ms
最大并发连接数: 28232
失败连接数: 1768
连接成功率: 94.1067%
总请求数: 17752
请求成功率: 100%
平均连接建立速度: 275.992 连接/秒
平均请求QPS: 173.541 请求/秒
=== 结果分析 ===
???? 服务器表现良好 - 接近承载极限
在单个端口的情况下,系统能维持的最大并发连接数为28232。该限制可能源于操作系统层面的约束。每个TCP连接由一个四元组(源IP、源端口、目的IP、目的端口)唯一标识。由于客户端与服务端程序运行在同一台机器上,可用端口数量受到限制,从而影响了最大连接数。
所有成功建立的连接在稳定性方面表现良好。若需进一步提升最大连接数,可考虑以下几种优化方式:
本项目让我深刻体会到高并发系统设计的复杂性。通过亲手实现各个核心模块,我在网络编程、内存管理、并发控制等方面获得了扎实的实践经验。这不仅是一次技术能力的全面提升,更是一场工程思维方式的深度锤炼。
扫码加好友,拉您进群



收藏
