大家好,欢迎来到IT知识分享网。
一、系统介绍
Sunnet系统是用C++实现的模仿Skynet的游戏服务器后端。Sunnet是多线程的服务端架构,通过多线程调度充分利用了机器的性能。
Catalogue:
- include:存放头文件(.h)
- src:存放源文件(.cpp)
- build:存放构建工程时的临时文件、可执行文件
- 3rd:存放第三方源码(这里存放编译好的Lua源码)
- service:存放各类型服务的Lua脚本
- luaclib:存放一些C模块(.so文件),提供給Lua脚本用
- luaclib_src:存放C模块的源代码(.c、.h)
- lualib:存放Lua模块,提供給service業務代碼使用
- proto:存放通信协议文件(.proto)
- tools:存放工具文件
- CMakeList.txt:CMake的指导文件
二、系统架构
include目錄
- Sunnet.h:架构底层,静态类。管理着:SocketWorker对象、Worker对象列表、Service对象列表、Conn对象列表、全局队列(globalQueue);以及对这些所管理对象的操作API。
- SocketWorker:socket网络线程类。
- Worker.h:工作线程类。Sunnet开启的工作线程具体实现。
- Service.h:服务类。管理自己的消息队列、Lua虚拟机(luaState)。
- Conn.h:连接类。每个和和客户端连接的socket对应一个Conn对象。
- ConnWriter.h:自己封装实现的一个写socket缓冲,用于有保证地发送长信息。
- Msg.h:协议类。
- LuaAPI.h:提供给Lua虚拟机(luaState)使用的C++函数。
- Atomic.h:原子操作函數定義。
- Timer.h:定時器綫程類。
- Monitor.h:監視器綫程類。
2.1 Sunnet进程的运行
Sunnet管理着:
- 一个网络线程(SocketThread)
- 多个工作线程(WorkerThreads)
- 一個定時器綫程(TimerThread)
- 一個監視器綫程(MonitorThread)
- 一个全局队列(globalQueue)
- 多个服务(Services)
- 多个与客户端的连接(Conns)
整个程序入口如下,其实就是创建一个静态类Sunnet,然后调用Sunnet::inst->Start()。
int main(){ //创建Sunnet引擎 new Sunnet(); //开始引擎 Sunnet::inst->Start(); //等待回收工作线程、网络线程、定時器綫程、監視器綫程 Sunnet::inst->Wait(); return 0; }
Sunnet::inst->Start()代码如下,其实就是开启多条工作线程、一個网络线程,一個定時器綫程,一個監視器綫程。这些线程都是在while()里循环执行的,因此程序入口main函数的Sunnet::inst->Wait()是一直回收不了子线程,所以main线程是阻塞的。
//开启系统 void Sunnet::Start(){ //開啓Monitor StartMonitor(); //开启Worker StartWorker(); //开启Socket StartSocket(); //开启Timer StartTimer(); }
总结:
- 该进程的main线程阻塞在
Sunnet::inst->Wait()。 - 该进程的一个网络线程(SocketThread)死循环执行。
- 该进程的多个工作线程(WorkerThreads)死循环执行。
- 该进程的一个定時器线程(TimerThread)死循环执行。
- 该进程的一个監視器线程(MonitorThread)死循环执行。
那么分析这个进程的执行入口就变成了,直接分析四个地方:
- 一个网络线程(SocketThread)在死循环里干了啥
- 多个工作线程(WorkerThreads)在死循环里干了啥
- 一个定時器线程(TimerThread)在死循环里干了啥
- 一个監視器线程(MonitorThread)在死循环里干了啥
2.1.1 SocketThread线程的运行
上面提到,Sunnet进程只有一个SocketThread线程。这个是死循环执行的。代码如下:
void SocketWorker::operator()() { while(true){ //阻塞等待 const int EVENT_SIZE = 64; struct epoll_event events[EVENT_SIZE]; int eventCount = epoll_wait(epollFd, events, EVENT_SIZE, -1); //取得事件 for(int i = 0; i < eventCount; ++i) { epoll_event ev = events[i]; //当前要处理的事件 OnEvent(ev); } } }
可见,SocketThread线程使用了Linux操作系统提供的epoll。
epoll_wait()的做法是如果epoll对象(通过系统API创建的操作系统管理的一个对象)里没有事件消息,那么这个SocketThread线程就阻塞在epoll_wait()这里,不会占用CPU资源。
如果有客户端发来消息时(可能同时有多个客户端发来消息),就会唤醒这个SocketThread线程往下执行,执行OnEvent(ev)。
OnEvent(ev)这个函数执行流程:
- 如果是新连接的客户端发来消息:
- 新建一个Conn对象,绑定socketfd和服务(Service)id
- 把新建的Conn对象交给Sunnet的Conns列表管理
- 把新连接的客户端socketfd绑定到epoll对象进行监听
- 如果是已连接的客户端发来消息:
- 根据和客户端通信的socketfd,找到Sunnet的Conns列表对应的Conns。
- 根据Conns找到对应的服务(Service)。
- 把和客户端通信的信息发送到服务(Service)的消息列表中。
- 把这个服务(Service)插入到全局队列(globalQueue)。
- 唤醒工作线程(WorkerThreads)去处理服务(Service)。这里的唤醒用到了条件变量+互斥锁(pthread_cond_t + pthread_mutex_t)实现。
如左上图,黑色小圆圈代表服务(Service),后面跟着的长方形是消息队列。服务1有4条信息,服务2有1条信息,服务3有3条信息。它们一开始没有信息时(消息队列为空)只是躺在Sunnet的Services列表里,一旦有信息后,立马被插入到全局队列(globalQueue)中。
如右上图,工作线程(WorkerThreads)被唤醒后(全局队列(globalQueue)不为空啦),就会把要处理的服务(Service)弹出全局队列(globalQueue),然后处理服务(Service)消息队列里的消息,这里可以设置信息的条数,如果一次处理不完,可以把这个服务(Service)重新插入全局队列(globalQueue)队尾,等待下次某个工作线程(WorkerThreads)抢到执行权去执行。
2.1.2 WorkerThreads线程的运行
上面提到,Sunnet进程有多个WorkerThreads线程,也是死循环的,它们用同一套代码:
void Worker::operator()() { while(true) { std::shared_ptr<Service> srv = Sunnet::inst->PopGlobalQueue(); if (!srv) { Sunnet::inst->WorkerWait(); } else { srv->ProcessMsgs(eachNum); CheckAndPutGlobal(srv); } } }
如上可见,工作线程的工作就是:
从Sunnet管理的全局队列(globalQueue)里弹出一个服务(Service)。
如果是空的,说明目前没有任何服务(Service)有信息要处理的,那么这个工作线程就会调用Sunnet::inst->WorkerWait()阻塞等待。
如果有信息,那么就调用srv->ProcessMsgs(eachNum)直接去处理服务(Service)消息队列里指定数量的信息,如果这个服务(Service)的信息没全部处理完,可以重新插入全局队列(globalQueue),等待下次某个工作线程抢到CPU继续执行处理服务(Service)的消息。
处理服务(Service)消息的时候,流程是srv->ProcessMsgs(eachNum) ——> srv->ProcessMsg() ——> srv->OnMsg()
代码如下:
void Service::ProcessMsgs(int max) { for (int i = 0; i < max; ++i) { bool succ = ProcessMsg(); if(!succ) { break; } } } bool Service::ProcessMsg() { std::shared_ptr<BaseMsg> msg = PopMsg(); if (msg) { OnMsg(msg); return true; } else { return false; //返回值预示着队列是否为空 } } void Service::OnMsg(std::shared_ptr<BaseMsg> msg) { //std::cout << "[" << id << "] OnMsg" << std::endl; switch(msg->type) { case (BaseMsg::TYPE::SERVICE): { auto m = std::dynamic_pointer_cast<ServiceMsg>(msg); OnServiceMsg(m); break; } case (BaseMsg::TYPE::SERVICE_CALLBACK): { auto m = std::dynamic_pointer_cast<ServiceMsg>(msg); OnServiceCallbackMsg(m); break; } case (BaseMsg::TYPE::SOCKET_ACCEPT): { auto m = std::dynamic_pointer_cast<SocketAcceptMsg>(msg); OnAcceptMsg(m); break; } case (BaseMsg::TYPE::SOCKET_RW): { auto m = std::dynamic_pointer_cast<SocketRWMsg>(msg); OnRWMsg(m); break; } default: break; } }
如代码可见,srv->OnMsg()方法里,根据消息类型进行强转后,根据不同消息类型调用不同的处理函数,这些函数里,又会调用LuaAPI函数,然后执行Lua代码。
2.1.3 TimerThread线程的运行
上面提到,Sunnet进程只有一个TimerThread线程。这个是死循环执行的。代码如下:
void Timer::operator()() { while(true) { int sleep_time = Sunnet::inst->GetNearestTimer(); usleep(sleep_time); Sunnet::inst->ExpireTimer(); // 更新检测定时器,并把定时事件发送到消息队列中 } }
如上可见,定時器线程的工作就是:
- 從最小堆(定時器存儲定時事件的底層我這裏是最小堆實現的)中拿出最近過期的時間,然後休眠(休眠綫程期間不占用CPU資源)。
Sunnet::inst->ExpireTimer()這個代碼是遍歷最小堆,把到期的事件取出,然後調用Sunnet::inst->Send(),把事件插入對應的服務,然後服務插入到全局隊列,最後worker綫程從全局隊列拿到服務后,就可以處理這個定時事件了。
void Timer::ExpireTimer() { if (_heap.empty()) return; uint32_t now = current_time(); do { TimerNode* node = _heap.front(); if (now < node->expire) break; auto msg = Sunnet::inst->MakeCallbackMsg(node->service_id, node->cb, strlen(node->cb)); Sunnet::inst->Send(node->service_id, msg); _delNode(node); } while(!_heap.empty()); }
首先我們看提供給Lua調用的定時器API,為了節省篇幅,我們拿添加定時器接口解釋。可以看到下面代碼,C++從Lua棧中取出3個值:第一個是發起定時器事件所屬的服務id,第二個是定時器事件的超時時間,第三個是需要回調到Lua的Lua函數名。注意這裏傳過來的是Lua的函數名,爲什麽不直接把Lua函數傳過來呢?因爲Lua本身是不支持将Lua函数作为函数参数传入C/C++的,不管这个想要传入的函数是全局的 、局部的、或者匿名的(匿名的本质上也算局部的)。
//添加定时器 int LuaAPI::AddTimer(lua_State *luaState){ //参数个数 int num = lua_gettop(luaState); //参数1:service_id if(lua_isinteger(luaState, 1) == 0) { lua_pushinteger(luaState, -1); return 1; } int service_id = lua_tointeger(luaState, 1); //参数2:expire 超时时间 if(lua_isinteger(luaState, 2) == 0) { lua_pushinteger(luaState, -1); return 1; } int expire = lua_tointeger(luaState, 2); //参数3:func_name if(lua_isstring(luaState, 3) == 0) { lua_pushinteger(luaState, -1); return 1; } size_t len = 0; const char *func_name = lua_tolstring(luaState, 3, &len); char *newstr = new char[len+1]; //后面加\0 newstr[len] = '\0'; memcpy(newstr, func_name, len); //将字符串又复制一遍原因是Lua字符串是Lua虚拟机管理的,其带有垃圾回收机制,复制一遍为了防止可能发生的冲突 int id = Sunnet::inst->AddTimer(service_id, expire, newstr); //返回值 lua_pushinteger(luaState, id); return 1; }
C/C++那边仅支持传入一个全局函数名(当然不一定得全局的,根据实际情况,可能在其他自己构造的表里也行),那麽Lua怎麽把一個局部的、或者匿名的函數傳給C/C++使用呢?
我的思路就是将Lua函数和一个唯一的字符串做映射(提供wrap函數產生一個唯一的全局函數名)。
同時,需要考慮到在多次调用wrap函数后,将导致全局表也随之膨胀。我们需要想办法在C/C++完成回调后,来清除wrap建立的数据。这个工作当然可以放到C/C++来进行 ,例如每次发生回调后,就设置下全局表。但这明显是不对的,因为违背了接口的设计原则 ,这个额外的机制是在Lua里添加的,那么责任也最好由Lua来负。
要解决这个问题,就可以 使用Lua的metamethods机制。这个机制可以在Lua内部发生特定事件时,让应用层得到通知。 这里,我们需要关注__call事件。Lua中只要有__call metamethod的值,均可被当作函数调用。
id = 0 local function generate_func_id() id = id + 1 return id end local function del_callback(name) _G[name] = nil end local function create_callback_table (func, name) local t = {} t.callback = func --创建元表。元方法__call。目的是在c++层,可以直接通过func_name调用_G[func_name](即t),然后执行__call里的函数 setmetatable (t, {__call = -- 关注__call function (func, ...) -- 在t(xx)时,将调用到这个函数 func.callback(...) -- 真正的回调 del_callback(name) -- 回调完毕,清除wrap建立的数据 end }) return t end local function wrap (func) local id = generate_func_id() -- 产生唯一的id local fn_s = "_callback_fn".. id --生成唯一函數名 _G[fn_s] = create_callback_table(func, fn_s) -- _G[fn_s]对应的是一个表 return fn_s end function AddTimer(serviceId, expire, func) local func_name = wrap(func) --调c++函数 return sunnet.AddTimer(serviceId, expire, func_name) end
定時器事件到期時,發給服務的消息類型是SERVICE_CALLBACK,worker綫程拿到服務消費時,就會調用到這個函數。這個函數直接通過Lua給我們提供的唯一函數名func_name,調用到全局表_G[func_name]對應的t表的元表的__call的元方法,如上代碼。最終就會調用到Lua的回調方法。
void Service::OnServiceCallbackMsg(std::shared_ptr<ServiceMsg> msg) { std::cout << " OnServiceCallbackMsg " << std::endl; //调用Lua函数 lua_getglobal(luaState, msg->buff.get()); int isok = lua_pcall(luaState, 0, 0, 0); if(isok != 0) { //若返回值为0则代表成功,否者代表失败 std::cout << "call lua OnServiceCallbackMsg fail" << lua_tostring(luaState, -1) << std::endl; } }
測試代碼如下,下面創建了一個定時器,3ms后,調用函數内的局部函數test。
-- 创建定时器测试 function create_timer_test(serviceId) local function test() print("!!! --- [lua] [create_timer_test callback success] --- !!! ") end local timer_id = AddTimer(serviceId, 3, test) return timer_id end
2.1.4 MonitorThread线程的运行
上面提到,Sunnet进程只有一个MonitorThread线程。这个是死循环执行的。代码如下:
void Monitor::operator()() { while(true) { //每5秒檢測一次 usleep(5*DEFAULT_SLEEP_TIME); Sunnet::inst->MonitorCheck(); } }
先介紹一下Monitor監視器對象,Monitor監視所有的Worker綫程,因此每個Worker綫程對應一個struct WrorkerMonitor結構,但worker對象沒必要保存這個結構對象,統一由Monitor對象管理即可,兩者之間的關聯只要通過worker_id關聯即可。
struct WrorkerMonitor { int version; int check_version; int service_id; }; class Monitor { public: ... int _count; //監視數量 std::unordered_map<uint32_t, std::shared_ptr<WrorkerMonitor>> wrorkerMonitors; //監視對象 public: //获取監視對象 std::shared_ptr<WrorkerMonitor> GetWorkerMonitor(uint32_t worker_id); ... };
worker线程处理服務消息前調用MonitorTrigger()方法记录服務的id。处理完清除。
std::shared_ptr<Service> srv = Sunnet::inst->PopGlobalQueue(); if(src){ //拿出service消費前,先標注一下 Sunnet::inst->MonitorTrigger(id, srv->id); //消費服務 srv->ProcessMsgs(eachNum); //是否將服務重新插入全局隊列 CheckAndPutGlobal(srv); //消費完service,標注一下 Sunnet::inst->MonitorTrigger(id, 0); }
標注代碼
void Monitor::MonitorTrigger(uint32_t worker_id, int service_id) { std::shared_ptr<WrorkerMonitor> worker_monitor = GetWorkerMonitor(worker_id); if (!worker_monitor) return; worker_monitor->version ++; worker_monitor->service_id = service_id; }
從Monitor綫程死循環可以看到,Monitor監視器,每隔5秒就會檢測一次所有的Worker綫程是否陷入死循環。
判斷原理是version和check_version是否一致,如果一致并且service_id>0,説明這個worker綫程消費這個service超過了5秒鈡,很可能是service消息中有死循環,關注一下Lua代碼是否有死循環了。
void Monitor::MonitorCheck() { CHECK_ABORT int worker_id = 0; for (worker_id = 0; worker_id < Count(); ++worker_id) { std::shared_ptr<WrorkerMonitor> worker_monitor = GetWorkerMonitor(worker_id); if (!worker_monitor) return; if (worker_monitor->version == worker_monitor->check_version) { if (worker_monitor->service_id) { Sunnet::inst->OnServiceErr(worker_monitor->service_id); } } else { worker_monitor->check_version = worker_monitor->version; } } }
如果發生死循環,我這裏的處理是,調用Sunnet::inst->OnServiceErr(worker_monitor->service_id);直接通知服務的Lua層發生錯誤(因爲業務代碼都是Lua在寫,C++代碼僅僅是通知Lua消息,不可能發生死循環情況),這樣worker綫程才能正常執行下去。
注意不能直接殺死服務,如果是直接殺死服務,worker綫程會出現無法正常執行下去的狀況,這樣會導致worker綫程一直死循環了,無法休眠或者執行其他服務消息,也霸占了CPU資源。
void Service::OnServiceErr(){ std::cout << "[error] OnServiceErr " << std::endl; //调用Lua函数 //通知Lua函數錯誤 luaL_error(luaState, "script timeout."); }
2.2. Service
2.2.1 Service与Lua虚拟机
新建服务(Service)的任务在Sunnet,因为Sunnet管理服务列表的增删改查
uint32_t Sunnet::NewService(std::shared_ptr<std::string> type) { auto srv = std::make_shared<Service>(); srv->type = type; pthread_rwlock_wrlock(&servicesLock); { srv->id = maxId; maxId++; services.emplace(srv->id, srv); } pthread_rwlock_unlock(&servicesLock); srv->OnInit(); //初始化 return srv->id; }
如上代码所示,每个Service对象被创建后,调用OnInit()创建会一个Lua虚拟机,因此每个Service的Lua代码互相隔离。创建Lua虚拟机后,还调用LuaAPI::Register(luaState)方法,把C++的一些方法注册给Lua虚拟机(luaState)使用。
void Service::OnInit() { std::cout << "[" << id << "] OnInit" << std::endl; //新建Lua虚拟机 luaState = luaL_newstate(); //开启全部标准库 luaL_openlibs(luaState); //注册Sunnet系统API LuaAPI::Register(luaState); //执行Lua文件 std::string filename = "../service/" + *type + "/init.lua"; int isok = luaL_dofile(luaState, filename.data()); if(isok == 1) { //若成功则返回值未0,若失败则返回值为1 std::cout << "run lua fail:" << lua_tostring(luaState, -1) << std::endl; } //调用Lua函数 lua_getglobal(luaState, "OnInit"); //把指定全局变量压栈,并返回该值的类型 lua_pushinteger(luaState, id); //把整型数压栈 isok = lua_pcall(luaState, 1, 0, 0); //调用一个Lua方法。参数二代表Lua方法的参数值个数,参数三代表Lua方法的返回值个数,参数四代表如果调用失败应该采取什么样的处理方法,填写0代表使用默认方式 if(isok != 0) { //若返回值为0则代表成功,否则代表失败 std::cout << "call lua OnInit fail " << lua_tostring(luaState, -1) << std::endl; } }
如上代码所示,每个服务(Service)被创建后,会执行一次Lua函数OnInit。
杀死服务(Service)的任务在Sunnet,因为Sunnet管理服务列表的增删改查
void Sunnet::KillService(uint32_t id) { std::shared_ptr<Service> srv = GetService(id); if (!srv) return; //退出前 srv->OnExit(); srv->isExiting = true; //删除前 pthread_rwlock_wrlock(&servicesLock); { services.erase(id); } pthread_rwlock_unlock(&servicesLock); } void Service::OnExit() { std::cout << "[" << id << "] OnExit" << std::endl; //调用Lua函数 lua_getglobal(luaState, "OnExit"); int isok = lua_pcall(luaState, 0, 0, 0); //C++与Lua是单线程交互,lua_pcall的执行时间即Lua脚本的运行时间。 if(isok != 0) { //若返回值为0则代表成功,否则代表失败 std::cout << "call lua OnExit fail " << lua_tostring(luaState, -1) << std::endl; } //关闭Lua虚拟机 lua_close(luaState); }
如上代码所示,每个服务(Service)被杀死后,会执行一次Lua函数OnExit。
工作线程(WorkerThreads)处理服务(Service)消息队列的信息时,在srv->OnMsg()方法里,根据消息类型进行强转后,根据不同消息类型调用不同的处理函数,这些函数里,又会调用LuaAPI函数,然后执行Lua代码。
总结:
Lua代码被执行的地方有:
- 服务(Service)被创建时
OnInit() - 工作线程(WorkerThreads)处理服务(Service)消息时:
OnServiceMsg()(服务间信息)、OnAcceptMsg()(接收到新客户端连接)、OnSocketData()(收到客户端信息)、OnSocketClose()(关闭与客户端连接)、 - 服务(Service)被杀掉时
OnExit()
2.2.2 Service之间的通信
Service之间的通信是调用Sunnet的Send()方法,代码如下:
void Sunnet::Send(uint32_t toId, std::shared_ptr<BaseMsg> msg) { std::shared_ptr<Service> toSrv = GetService(toId); if (!toSrv) { std::cout << "Send fail, toSrv not exist toId:" << toId << std::endl; return; } //插入目标服务器的消息队列 toSrv->PushMsg(msg); //检查并放入全局队列 bool hasPush = false; pthread_spin_lock(&toSrv->inGlobalLock); { if (!toSrv->inGlobal) { PushGlobalQueue(toSrv); toSrv->inGlobal = true; hasPush = true; } } pthread_spin_unlock(&toSrv->inGlobalLock); //唤醒进程 if(hasPush) { CheckAndWeekUp(); } }
如上代码所示,服务(Service)之间的通信是非常巧妙的,因为Sunnet管理了所有的Service,所以通过方法std::shared_ptr<Service> toSrv = GetService(toId)直接可以找到要发信息的目标Service。
如上面说到的,Sunnet是静态类,在内存的静态储存区中只存在一个Sunnet对象。因此发送信息给目标Service的消息队列就变成了,直接通过目标Service的id在Sunnet的Services里找到目标Service对象,然后目标Service对象把信息插入到自己的消息队列里就可以了。
三、注意事项
3.1 队列的加锁操作
- 对全局队列(globalQueue)的操作由于涉及多线程竞争问题(多个工作线程和一个网络线程),使用自旋锁pthread_spinlock_t。
- 对某个服务(Service)的消息列表的操作,也涉及多线程竞争问题(多个工作线程之间互相调用Sunnet::Send),使用读写锁pthread_spinlock_t
- 对某个服务(Service)的操作(新增服务、删除服务),也涉及多线程竞争问题(某个工作线程对服务进行处理信息操作时,某个工作线程要删除服务),由于多读少写的特性,使用读写锁pthread_rwlock_t
- 对某个Conn的操作(新增Conn、删除Conn),也涉及多线程竞争问题(某个工作线程创建Conn、某个工作线程要关闭Conn),由于多读少写的特性,使用读写锁pthread_rwlock_t
3.2 生产者消费者
在Sunnet系统中,生产者是一个网络线程(SocketThread),当有客户端信息到来,就把信息插入到对应服务(Service)的消息队列,然后把服务(Service)插入到全局队列(globalQueue)。
在Sunnet系统中,消费者是多个工作线程(WorkerThreads),当全局队列不为空,把服务(Service)从全局队列(globalQueue)里拿出来消费。
生产者消费者之间的池是全局队列(globalQueue)。
使用条件变量+互斥锁实现生产者和消费者之间的沉睡和唤醒:
//Worker线程调用,进入休眠 void Sunnet::WorkerWait() { pthread_mutex_lock(&sleepMtx); sleepCount++; pthread_cond_wait(&sleepCond, &sleepMtx); //条件变量sleepCond,互斥锁sleepMtx sleepCount--; pthread_mutex_unlock(&sleepMtx); } //唤醒工作线程 void Sunnet::CheckAndWeekUp() { //unsafe if(sleepCount == 0) { return; } if(WORKER_NUM - sleepCount <= globalLen) { std::cout << "weakup" << std::endl; pthread_cond_signal(&sleepCond); //条件变量sleepCond } }
消费者:工作线程(WorkerThreads)通过WorkerWait()沉睡。
生产者:网络线程(SocketThread)通过CheckAndWeekUp()唤醒工作线程(WorkerThreads)。
3.3 创建epoll对象
epoll_create是创建epoll对象的方法,就如Socket对象一样,epoll对象也是由操作系统管理的。用户可以使用系统提供的API来操作它。如果创建成功,则epoll_create返回epoll对象的描述符给进程的文件描述符;如果创建失败,则返回-1。
3.4 epoll更改监听事件
epoll_ctl 将可读事件和可写事件分开,是出于性能的考量,因为监听的事件越少,性能就会越高。
一般情况下,只需要关注可读事件即可,只有在“消息发送失败”后,才需要关注可写事件。
对于一个客户端连接,服务端会不停地更改要监听的事件,以求达到最高的性能。
3.5 epoll事件的边缘触发和水平触发
新数据到达时,无论使用的是水平触发模式还是边缘触发模式,epoll对象都会唤醒服务端。
3.5 后台启动
方法一:nohup表示忽略所有挂断(SIGHUP信号),&表示后台运行。
nohup ./sunnet &
方法二:创建守护进程。让程序转入后台运行,就算断开终端(SSH会话)也不会中断程序。因为创建守护进程后该进程忽略了SIGHUP信号。
int main() { ... daemon(0, 0); ... }
3.6 屏蔽SIGPIPE信号
Linux系统有一个坑。在TCP的设计中,发送端向“套接字信息不匹配”的接收端发送数据时,接收端会回应复位信息(RST)。例如,发送端向已销毁套接字的接收端发送数据时,发送端就会收到复位信号。
在Linux系统中,对“收到复位(RST)信号的套接字”调用write时,操作系统会向进程发送SIGPIPE信号,默认处理动作是终止进程。
解决方法是忽略SIGPIPE信号:
#include <signal> void Sunnet::Start() { //忽略SIGPIPE信号 signal(SIGPIPE, SIG_IGN); ... }
3.7 封装socket的读写缓冲区
套接字的读写缓冲区容量有限,以至于常常不能完整发送数据或者完整接收全部数据。
解决方法一:设置SNDBUFFORCE。Linux提供的setsockopt方法,可将套接字缓冲区设置大小。Linux系统会按需分配空间,不过缺点是如果部分玩家套接字缓冲区占据了GB级别的内存空间,那么游戏服务器承载量极大降低,甚至因为内存不足二早早挂掉,因此该值建议不用修改。
void SocketWorker::OnAccept(std::shared_ptr<Conn> conn) { //步骤1:accept int clientFd = accept(conn->fd, NULL, NULL); //此时操作系统内核会创建一个新的套接字结构,代表该客户端连接,并返回它的文件描述符。 if(clientFd < 0) { std::cout << "accpet error" << std::endl; } //步骤2:设置非阻塞 fcntl(clientFd, F_SETFL, O_NONBLOCK); //设置写缓冲区大小。注意一般不用修改该值。因为可能存在玩家对应socket写缓冲区占据太多内存会导致服务器内存不足而dump掉 unsigned long buffSize = ; // 4G if(setsockopt(clientFd, SOL_SOCKET, SO_SNDBUFFORCE, &buffSize, sizeof(buffSize)) < 0) { std::cout << "OnAccept setsockopt Fail " << strerror(errno) << std::endl; } ... }
解决方法二:
自己实现读写缓冲区,对操作系统底层API:send()、write()进行进一步的封装。
下面以写缓冲区为例:
#pragma once #include <list> #include <stdint.h> #include <memory> //写缓冲区类 class WriteObject { public: std::streamsize start; //代表已经写入套接字写缓冲区的字节数 std::streamsize len; //代表需要发送的总字节数 std::shared_ptr<char> buff; }; class ConnWriter { public: int fd; private: bool isClosing = false; //是否正在关闭 std::list<std::shared_ptr<WriteObject>> objs; //双向链表,保存所有尚未发送成功的数据。 public: void EntireWrite(std::shared_ptr<char> buff, std::streamsize len); //尝试按序发送数据,如果未能全部发送,把未发送的数据存入objs列表。 void LingerClose(); //延迟关闭的方法,调用该方法后,如果ConnWriter尚有待发送的数据,则ConnWriter会先把数据发送完,最后才关闭连接 void OnWriteable(); //再次尝试发送剩余的数据 private: void EntireWriteWhenEmpty(std::shared_ptr<char> buff, std::streamsize len); void EntireWriteWhenNotEmpty(std::shared_ptr<char> buff, std::streamsize len); bool WriteFrontObj(); };
如上代码所示,通过list存储每次发送失败的数据。通过WriteObject的start和len记录某次发送的buff的长度,以便完整发送某次的全部数据。
例如:服务端发送“hahaha”、“hehehe”、“ooooo”三条消息给客户端。然而只发送成功“hah”,那么“hahaha”、“hehehe”、“ooooo”这三条消息都会存入list,list中的第一个WriteObject是“hahaha”,其中start是3,len是6,这表示第一个WriteObject并没有发送完,还需要把剩下的的“aha”发送后,list变成剩下“hehehe”、“ooooo”这两个WriteObject没发送。
四、项目地址
github:
https://github.com/hhhhhhh12123/Sunnet
gitee:
https://gitee.com/smallppppig/sunnet
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/114763.html






