大家好,欢迎来到IT知识分享网。
bool CIOCPServer::Initialize(NOTIFYPROC pNotifyProc, CMainFrame* pFrame, int nMaxConnections, int nPort) {
m_pNotifyProc = pNotifyProc; m_pFrame = pFrame; m_nMaxConnections = nMaxConnections; m_socListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (m_socListen == INVALID_SOCKET) {
TRACE(_T("Could not create listen socket %ld\n"), WSAGetLastError()); return false; } // 创建Event事件,用于线程间的同步 m_hEvent = WSACreateEvent(); if (m_hEvent == WSA_INVALID_EVENT) {
TRACE(_T("WSACreateEvent() error %ld\n"), WSAGetLastError()); closesocket(m_socListen); return false; } // The listener is ONLY interested in FD_ACCEPT // That is when a client connects to or IP/Port // Request async notification /* WSAEventSelect模型是WindowsSockets提供的一个有用异步I/O模型。 该模型允许在一个或者多个套接字上接收以事件为基础的网络事件通知。 Windows Sockets应用程序在创建套接字后,调用WSAEventSelect()函数,将一个事件对象与网络事件集合关联在一起。 当网络事件发生时,应用程序以事件的形式接收网络事件通知。 WSAEventSelect模型简单易用,也不需要窗口环境。 该模型唯一的缺点是有最多等待64个事件对象的限制(Linux中SELECT模型最多可以同时处理128个事件对象),当套接字连接数量增加时,就必须创建多个线程来处理I/O,也就是所谓的线程池。 */ //将m_hEvent事件绑定吧到m_socListen套接字上,用于监听FD_ACCEPT类型的事件 int nRet = WSAEventSelect(m_socListen, m_hEvent,FD_ACCEPT); if (nRet == SOCKET_ERROR) {
TRACE(_T("WSAAsyncSelect() error %ld\n"), WSAGetLastError()); closesocket(m_socListen); return false; } SOCKADDR_IN saServer; // Listen on our designated Port# saServer.sin_port = htons(nPort); // Fill in the rest of the address structure saServer.sin_family = AF_INET; saServer.sin_addr.s_addr = INADDR_ANY; // bind our name to the socket nRet = bind(m_socListen, (LPSOCKADDR)&saServer, sizeof(struct sockaddr)); if (nRet == SOCKET_ERROR) {
DWORD dwErr = GetLastError(); closesocket(m_socListen); return false; } // Set the socket to listen nRet = listen(m_socListen, SOMAXCONN); if (nRet == SOCKET_ERROR) {
TRACE(_T("listen() error %ld\n"), WSAGetLastError()); closesocket(m_socListen); return false; } UINT dwThreadId = 0; //创建一个监听线程专门用来监听客户端的连接请求 m_hThread = (HANDLE)_beginthreadex(NULL, // Security 0, // Stack size - use default ListenThreadProc, // Thread fn entry point (void*) this, 0, // Init flag &dwThreadId); // Thread address if (m_hThread != INVALID_HANDLE_VALUE) {
//初始化IOCP模型 InitializeIOCP(); m_bInit = true; return true; } return false; }
因为此时客户端并没有发起连接远程主机的请求,所以主线程创建完工作线程——ListenThreadProc,ListenThreadProc便一直处于while(1)循环中,反复判断m_hkillEvent事件是否有信号(主线程和工作线程之间通过Event事件实现同步)、是否有IO事件的产生。
unsigned CIOCPServer::ListenThreadProc(LPVOID lParam) { CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(lParam); WSANETWORKEVENTS events; while (1) { *if (WaitForSingleObject(pThis->m_hKillEvent, 100) == WAIT_OBJECT_0) break;* dwRet = WSAWaitForMultipleEvents(1, &pThis->m_hEvent, FALSE, 100, FALSE); //超时的话,则继续下一轮的循环 if (dwRet == WSA_WAIT_TIMEOUT) continue; int nRet = WSAEnumNetworkEvents(pThis->m_socListen, pThis->m_hEvent, &events); if (nRet == SOCKET_ERROR) { TRACE(_T("WSAEnumNetworkEvents error %ld\n"), WSAGetLastError()); break; } if (events.lNetworkEvents & FD_ACCEPT) { //如果发生了可读事件,那么就开始调用OnAccept函数执行数据的读取 if (events.iErrorCode[FD_ACCEPT_BIT] == 0) pThis->OnAccept(); else { TRACE(_T("Unknown network event error %ld\n"), WSAGetLastError()); break; } } } return 0; }
当gh0st_server发出关闭窗口的命令(也即主线程发起退出线程的请求),m_hkillEvent将被设置成受信状态。
void CIOCPServer::Stop() {
::SetEvent(m_hKillEvent); //等待子线程退出,主线程才关闭线程句柄m_thread和事件句柄m_hKillEvent WaitForSingleObject(m_hThread, INFINITE); CloseHandle(m_hThread); CloseHandle(m_hKillEvent); }
当工作线程执行到如下代码处:
if (WaitForSingleObject(pThis->m_hKillEvent, 100) == WAIT_OBJECT_0) break
#define NC_CLIENT_CONNECT 0x0001 #define NC_CLIENT_DISCONNECT 0x0002 #define NC_TRANSMIT 0x0003 #define NC_RECEIVE 0x0004 #define NC_RECEIVE_COMPLETE 0x0005 // 完整接收 class CLock {
public: CLock(CRITICAL_SECTION& cs, const CString& strFunc) {
m_strFunc = strFunc; m_pcs = &cs; Lock(); } ~CLock() {
Unlock(); } void Unlock() {
LeaveCriticalSection(m_pcs); TRACE(_T("LC %d %s\n") , GetCurrentThreadId() , m_strFunc); } void Lock() {
TRACE(_T("EC %d %s\n") , GetCurrentThreadId(), m_strFunc); EnterCriticalSection(m_pcs); } protected: CRITICAL_SECTION* m_pcs; CString m_strFunc; }; //声明IO事件 enum IOType {
IOInitialize, IORead, IOWrite, IOIdle }; class OVERLAPPEDPLUS {
public: OVERLAPPED m_ol; IOType m_ioType; OVERLAPPEDPLUS(IOType ioType) {
ZeroMemory(this, sizeof(OVERLAPPEDPLUS)); m_ioType = ioType; } }; //客户端context类,包括客户端socket、收发缓冲区 struct ClientContext {
SOCKET m_Socket; // Store buffers CBuffer m_WriteBuffer; //自定义的内存池类CBuffer CBuffer m_CompressionBuffer; // 接收到的压缩的数据 CBuffer m_DeCompressionBuffer; // 解压后的数据 CBuffer m_ResendWriteBuffer; // 上次发送的数据包,接收失败时重发时用 int m_Dialog[2]; // 放对话框列表用,第一个int是类型,第二个是CDialog的地址 int m_nTransferProgress; // Input Elements for Winsock WSABUF m_wsaInBuffer; BYTE m_byInBuffer[8192]; // Output elements for Winsock WSABUF m_wsaOutBuffer; HANDLE m_hWriteComplete; // Message counts... purely for example purposes LONG m_nMsgIn; LONG m_nMsgOut; BOOL m_bIsMainSocket; // 是不是主socket ClientContext* m_pWriteContext; ClientContext* m_pReadContext; }; template<> inline UINT AFXAPI HashKey(CString & strGuid) {
return HashKey( (LPCTSTR) strGuid); } #include "Mapper.h" typedef void (CALLBACK* NOTIFYPROC)(LPVOID, ClientContext*, UINT nCode); typedef CList<ClientContext*, ClientContext*& > ContextList; class CMainFrame; class CIOCPServer {
public: void DisconnectAll(); CIOCPServer(); virtual ~CIOCPServer(); NOTIFYPROC m_pNotifyProc; CMainFrame* m_pFrame; bool Initialize(NOTIFYPROC pNotifyProc, CMainFrame* pFrame, int nMaxConnections, int nPort); //标准的windows线程函数的申明,加static就是为了使得线程函数不能有类的this指针,因为线程函数是按照stdcall的方式进行调用 static unsigned __stdcall ListenThreadProc(LPVOID lpVoid); static unsigned __stdcall ThreadPoolFunc(LPVOID WorkContext); static CRITICAL_SECTION m_cs; void Send(ClientContext* pContext, LPBYTE lpData, UINT nSize); void PostRecv(ClientContext* pContext); bool IsRunning(); void Shutdown(); void ResetConnection(ClientContext* pContext); LONG m_nCurrentThreads; LONG m_nBusyThreads; UINT m_nSendKbps; // 发送即时速度 UINT m_nRecvKbps; // 接受即时速度 UINT m_nMaxConnections; // 最大连接数 protected: void InitializeClientRead(ClientContext* pContext); BOOL AssociateSocketWithCompletionPort(SOCKET device, HANDLE hCompletionPort, DWORD dwCompletionKey); void RemoveStaleClient(ClientContext* pContext, BOOL bGraceful); void MoveToFreePool(ClientContext *pContext); ClientContext* AllocateContext(); LONG m_nWorkerCnt; bool m_bInit; bool m_bDisconnectAll; BYTE m_bPacketFlag[5]; void CloseCompletionPort(); void OnAccept(); bool InitializeIOCP(void); void Stop(); ContextList m_listContexts; ContextList m_listFreePool; WSAEVENT m_hEvent; SOCKET m_socListen; HANDLE m_hKillEvent; HANDLE m_hThread; HANDLE m_hCompletionPort; bool m_bTimeToKill; CCpuUsage m_cpu; LONG m_nKeepLiveTime; // 心跳超时 // Thread Pool Tunables LONG m_nThreadPoolMin; LONG m_nThreadPoolMax; LONG m_nCPULoThreshold; LONG m_nCPUHiThreshold; CString GetHostName(SOCKET socket); void CreateStream(ClientContext* pContext); BEGIN_IO_MSG_MAP() IO_MESSAGE_HANDLER(IORead, OnClientReading) IO_MESSAGE_HANDLER(IOWrite, OnClientWriting) IO_MESSAGE_HANDLER(IOInitialize, OnClientInitializing) END_IO_MSG_MAP() bool OnClientInitializing (ClientContext* pContext, DWORD dwSize = 0); bool OnClientReading (ClientContext* pContext, DWORD dwSize = 0); bool OnClientWriting (ClientContext* pContext, DWORD dwSize = 0); };
当有网络事件产生时,如何去收发数据、如何解包等关键业务逻辑将在下一节中进行讲解。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/118931.html