大数跨境
0
0

windows完成端口(五)

windows完成端口(五) CppGuide
2018-04-18
0
导读:系列目录windows完成端口(一)windows完成端口(二)windows完成端口(三)windows完

系列目录

windows完成端口(一)

windows完成端口(二)

windows完成端口(三)

windows完成端口(四)

windows完成端口(五)

windows完成端口(六)

#include "StdAfx.h"  
#include "IOCPModel.h"  
#include "MainDlg.h"  

// 每一个处理器上产生多少个线程(为了最大限度的提升服务器性能,详见配套文档)  
#define WORKER_THREADS_PER_PROCESSOR 2  
// 同时投递的Accept请求的数量(这个要根据实际的情况灵活设置)  
#define MAX_POST_ACCEPT              10  
// 传递给Worker线程的退出信号  
#define EXIT_CODE                    NULL  


// 释放指针和句柄资源的宏  

// 释放指针宏  
#define RELEASE(x)
{if(x != NULL ){delete x;x=NULL;}}  
// 释放句柄宏  
#define RELEASE_HANDLE(x)
{if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}  
// 释放Socket宏  
#define RELEASE_SOCKET(x)
{if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}  



CIOCPModel::CIOCPModel(void):  
                           m_nThreads(0),  
                           m_hShutdownEvent(NULL),  
                           m_hIOCompletionPort(NULL),  
                           m_phWorkerThreads(NULL),  
                           m_strIP(DEFAULT_IP),  
                           m_nPort(DEFAULT_PORT),  
                           m_pMain(NULL),  
                           m_lpfnAcceptEx( NULL ),  
                           m_pListenContext( NULL )  
{  
}  


CIOCPModel::~CIOCPModel(void)  
{  
   // 确保资源彻底释放  
   this->Stop();  
}  




///////////////////////////////////////////////////////////////////  
// 工作者线程:  为IOCP请求服务的工作者线程  
//         也就是每当完成端口上出现了完成数据包,就将之取出来进行处理的线程  
///////////////////////////////////////////////////////////////////  

DWORD WINAPI CIOCPModel::_WorkerThread(LPVOID lpParam)  
{      
   THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;  
   CIOCPModel* pIOCPModel = (CIOCPModel*)pParam->pIOCPModel;  
   int nThreadNo = (int)pParam->nThreadNo;  

   pIOCPModel->_ShowMessage(_T("工作者线程启动,ID: %d."),nThreadNo);  

   OVERLAPPED           *pOverlapped = NULL;  
   PER_SOCKET_CONTEXT   *pSocketContext = NULL;  
   DWORD                dwBytesTransfered = 0;  

   // 循环处理请求,知道接收到Shutdown信息为止  
   while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0))  
   {  
       BOOL bReturn = GetQueuedCompletionStatus(  
           pIOCPModel->m_hIOCompletionPort,  
           &dwBytesTransfered,  
           (PULONG_PTR)&pSocketContext,  
           &pOverlapped,  
           INFINITE);  

       // 如果收到的是退出标志,则直接退出  
       if ( EXIT_CODE==(DWORD)pSocketContext )  
       {  
           break;  
       }  

       // 判断是否出现了错误  
       if( !bReturn )    
       {    
           DWORD dwErr = GetLastError();  

           // 显示一下提示信息  
           if( !pIOCPModel->HandleError( pSocketContext,dwErr ) )  
           {  
               break;  
           }  

           continue;    
       }    
       else    
       {    
           // 读取传入的参数  
           PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped,
                                        PER_IO_CONTEXT,
                                        m_Overlapped);    

           // 判断是否有客户端断开了  
           if((0 == dwBytesTransfered) && ( RECV_POSTED==pIoContext->m_OpType ||
                                           SEND_POSTED==pIoContext->m_OpType))    
           {    
               pIOCPModel->_ShowMessage( _T("客户端 %s:%d 断开连接."),
                                             inet_ntoa(pSocketContext->m_ClientAddr.sin_addr),
                                             ntohs(pSocketContext->m_ClientAddr.sin_port) );  

               // 释放掉对应的资源  
               pIOCPModel->_RemoveContext( pSocketContext );  

               continue;    
           }    
           else  
           {  
               switch( pIoContext->m_OpType )    
               {    
                    // Accept    
               case ACCEPT_POSTED:  
                   {  

                       // 为了增加代码可读性,这里用专门的_DoAccept函数进行处理连入请求  
                       pIOCPModel->_DoAccpet( pSocketContext, pIoContext );                      


                   }  
                   break;  

                   // RECV  
               case RECV_POSTED:  
                   {  
                       // 为了增加代码可读性,这里用专门的_DoRecv函数进行处理接收请求  
                       pIOCPModel->_DoRecv( pSocketContext,pIoContext );  
                   }  
                   break;  

                   // SEND  
                   // 这里略过不写了,要不代码太多了,不容易理解,Send操作相对来讲简单一些  
               case SEND_POSTED:  
                   {  

                   }  
                   break;  
               default:  
                   // 不应该执行到这里  
                   TRACE(_T("_WorkThread中的 pIoContext->m_OpType 参数异常.\n"));  
                   break;  
               } //switch  
           }//if  
       }//if  

   }//while  

   TRACE(_T("工作者线程 %d 号退出.\n"),nThreadNo);  

   // 释放线程参数  
   RELEASE(lpParam);    

   return 0;  
}  



//====================================================================================  
//  
//                  系统初始化和终止  
//  
//====================================================================================  




////////////////////////////////////////////////////////////////////  
// 初始化WinSock 2.2  
bool CIOCPModel::LoadSocketLib()  
{      
   WSADATA wsaData;  
   int nResult;  
   nResult = WSAStartup(MAKEWORD(2,2), &wsaData);  
   // 错误(一般都不可能出现)  
   if (NO_ERROR != nResult)  
   {  
       this->_ShowMessage(_T("初始化WinSock 2.2失败!\n"));  
       return false;  
   }  

   return true;  
}  

//////////////////////////////////////////////////////////////////  
//  启动服务器  
bool CIOCPModel::Start()  
{  
   // 初始化线程互斥量  
   InitializeCriticalSection(&m_csContextList);  

   // 建立系统退出的事件通知  
   m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);  

   // 初始化IOCP  
   if (false == _InitializeIOCP())  
   {  
       this->_ShowMessage(_T("初始化IOCP失败!\n"));  
       return false;  
   }  
   else  
   {  
       this->_ShowMessage(_T("\nIOCP初始化完毕\n."));  
   }  

   // 初始化Socket  
   if( false==_InitializeListenSocket() )  
   {  
       this->_ShowMessage(_T("Listen Socket初始化失败!\n"));  
       this->_DeInitialize();  
       return false;  
   }  
   else  
   {  
       this->_ShowMessage(_T("Listen Socket初始化完毕."));  
   }  

   this->_ShowMessage(_T("系统准备就绪,等候连接....\n"));  

   return true;  
}  


////////////////////////////////////////////////////////////////////  
//  开始发送系统退出消息,退出完成端口和线程资源  
void CIOCPModel::Stop()  
{  
   if( m_pListenContext!=NULL && m_pListenContext->m_Socket!=INVALID_SOCKET )  
   {  
       // 激活关闭消息通知  
       SetEvent(m_hShutdownEvent);  

       for (int i = 0; i < m_nThreads; i++)  
       {  
           // 通知所有的完成端口操作退出  
           PostQueuedCompletionStatus(m_hIOCompletionPort,
                                       0, (DWORD)EXIT_CODE,
                                       NULL);  
       }  

       // 等待所有的客户端资源退出  
       WaitForMultipleObjects(m_nThreads,
                              m_phWorkerThreads,
                              TRUE, INFINITE);  

       // 清除客户端列表信息  
       this->_ClearContextList();  

       // 释放其他资源  
       this->_DeInitialize();  

       this->_ShowMessage(_T("停止监听\n"));  
   }    
}  


////////////////////////////////  
// 初始化完成端口  
bool CIOCPModel::_InitializeIOCP()  
{  
   // 建立第一个完成端口  
   m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
                                                NULL, 0, 0 );  

   if ( NULL == m_hIOCompletionPort)  
   {  
       this->_ShowMessage(_T("建立完成端口失败!错误代码: %d!\n"),
                            WSAGetLastError());  
       return false;  
   }  

   // 根据本机中的处理器数量,建立对应的线程数  
   m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors();  

   // 为工作者线程初始化句柄  
   m_phWorkerThreads = new HANDLE[m_nThreads];  

   // 根据计算出来的数量建立工作者线程  
   DWORD nThreadID;  
   for (int i = 0; i < m_nThreads; i++)  
   {  
       THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;  
       pThreadParams->pIOCPModel = this;  
       pThreadParams->nThreadNo  = i+1;  
       m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread,
                                             (void *)pThreadParams,
                                              0, &nThreadID);  
   }  

   TRACE(" 建立 _WorkerThread %d 个.\n", m_nThreads );  

   return true;  
}  


/////////////////////////////////////////////////////////////////  
// 初始化Socket  
bool CIOCPModel::_InitializeListenSocket()  
{  
   // AcceptEx 和 GetAcceptExSockaddrs 的GUID,用于导出函数指针  
   GUID GuidAcceptEx = WSAID_ACCEPTEX;    
   GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;  

   // 服务器地址信息,用于绑定Socket  
   struct sockaddr_in ServerAddress;  

   // 生成用于监听的Socket的信息  
   m_pListenContext = new PER_SOCKET_CONTEXT;  

   // 需要使用重叠IO,必须得使用WSASocket来建立Socket,才可以支持重叠IO操作  
   m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM,
                                           0, NULL, 0,
                                           WSA_FLAG_OVERLAPPED);  
   if (INVALID_SOCKET == m_pListenContext->m_Socket)  
   {  
       this->_ShowMessage(_T("初始化Socket失败,错误代码: %d.\n"),
                              WSAGetLastError());  
       return false;  
   }  
   else  
   {  
       TRACE(_T("WSASocket() 完成.\n"));  
   }  

   // 将Listen Socket绑定至完成端口中  
   if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket,
                                        m_hIOCompletionPort,
                                       (DWORD)m_pListenContext, 0))    
   {    
       this->_ShowMessage(_T("绑定 Listen Socket至完成端口失败!错误代码: %d/n"),
                             WSAGetLastError());  
       RELEASE_SOCKET( m_pListenContext->m_Socket );  
       return false;  
   }  
   else  
   {  
       TRACE(_T("Listen Socket绑定完成端口 完成.\n"));  
   }  

   // 填充地址信息  
   ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));  
   ServerAddress.sin_family = AF_INET;  
   // 这里可以绑定任何可用的IP地址,或者绑定一个指定的IP地址  
   ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);                        
   //ServerAddress.sin_addr.s_addr = inet_addr(CStringA(m_strIP).GetString());          
   ServerAddress.sin_port = htons(m_nPort);                            

   // 绑定地址和端口  
   if (SOCKET_ERROR == bind(m_pListenContext->m_Socket,
                           (struct sockaddr *) &ServerAddress,
                           sizeof(ServerAddress)))  
   {  
       this->_ShowMessage(_T("bind()函数执行错误.\n"));  
       return false;  
   }  
   else  
   {  
       TRACE(_T("bind() 完成.\n"));  
   }  

   // 开始进行监听  
   if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN))  
   {  
       this->_ShowMessage(_T("Listen()函数执行出现错误.\n"));  
       return false;  
   }  
   else  
   {  
       TRACE(_T("Listen() 完成.\n"));  
   }  

   // 使用AcceptEx函数,因为这个是属于WinSock2规范之外的微软另外提供的扩展函数  
   // 所以需要额外获取一下函数的指针,  
   // 获取AcceptEx函数指针  
   DWORD dwBytes = 0;    
   if(SOCKET_ERROR == WSAIoctl(  
       m_pListenContext->m_Socket,  
       SIO_GET_EXTENSION_FUNCTION_POINTER,  
       &GuidAcceptEx,  
       sizeof(GuidAcceptEx),  
       &m_lpfnAcceptEx,  
       sizeof(m_lpfnAcceptEx),  
       &dwBytes,  
       NULL,  
       NULL))    
   {    
       this->_ShowMessage(_T("WSAIoctl 未能获取AcceptEx函数指针。错误代码: %d\n"),
                          WSAGetLastError());  
       this->_DeInitialize();  
       return false;    
   }    

   // 获取GetAcceptExSockAddrs函数指针,也是同理  
   if(SOCKET_ERROR == WSAIoctl(  
       m_pListenContext->m_Socket,  
       SIO_GET_EXTENSION_FUNCTION_POINTER,  
       &GuidGetAcceptExSockAddrs,  
       sizeof(GuidGetAcceptExSockAddrs),  
       &m_lpfnGetAcceptExSockAddrs,  
       sizeof(m_lpfnGetAcceptExSockAddrs),    
       &dwBytes,  
       NULL,  
       NULL))    
   {    
       this->_ShowMessage(_T("WSAIoctl 未能获取GuidGetAcceptExSockAddrs函数指针。错误代码: %d\n"),
                              WSAGetLastError());  
       this->_DeInitialize();  
       return false;  
   }    


   // 为AcceptEx 准备参数,然后投递AcceptEx I/O请求  
   for( int i=0;i<MAX_POST_ACCEPT;i++ )  
   {  
       // 新建一个IO_CONTEXT  
       PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();  

       if( false==this->_PostAccept( pAcceptIoContext ) )  
       {  
           m_pListenContext->RemoveContext(pAcceptIoContext);  
           return false;  
       }  
   }  

   this->_ShowMessage( _T("投递 %d 个AcceptEx请求完毕"),MAX_POST_ACCEPT );  

   return true;  
}  

////////////////////////////////////////////////////////////  


由于公众号文章字数有限,您可以接着阅读下一篇:windows完成端口(六)



系列目录

windows完成端口(一)

windows完成端口(二)

windows完成端口(三)

windows完成端口(四)

windows完成端口(五)

windows完成端口(六)

欢迎关注公众号『easyserverdev』。如果有任何技术或者职业方面的问题需要我提供帮助,可通过这个公众号与我取得联系,此公众号不仅分享高性能服务器开发经验和故事,同时也免费为广大技术朋友提供技术答疑和职业解惑,您有任何问题都可以在微信公众号直接留言,我会尽快回复您。

【声明】内容源于网络
0
0
CppGuide
专注于高质量高性能C++开发,站点:cppguide.cn
内容 981
粉丝 0
CppGuide 专注于高质量高性能C++开发,站点:cppguide.cn
总阅读2
粉丝0
内容981