完成端口服务器模型

栏目: 后端 · 发布时间: 6年前

内容简介:前提:IOCP的整体编程模型跟上面的纯重叠io 非常类似.  纯重叠io使用OVERLAPPED  + APC函数完成.这种模型的缺点是必须让调用apc函数进入alterable状态. 而IOCP解决了这个问题.IOCP让我们自己创建一些线程,

前提:

IOCP的整体编程模型跟上面的纯重叠io 非常类似.  纯重叠io使用OVERLAPPED  + APC函数完成.

这种模型的缺点是必须让调用apc函数进入alterable状态. 而IOCP解决了这个问题.IOCP让我们自己创建一些线程,

然后调用GetQueuedCompletionStatus 来告诉我们某个io操作完成, 就像是在另一个线程中执行了APC函数一样;

使用IOCP 的时候,一般情况下需要自己创建额外的线程,用于等待结果完成(GetQueuedCompletionStatus)

使用到的函数:

CreateIoCompletionPort : 创建/ 关联一个完成端口 .

第3个参数是一个自定义数据, 第4个是最多N个线程可被调用;

注意与其关联的HANDLE 必须要有OVERLAPPED属性的

//创建一个完成端口
HANDLE hComp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)
 
 
//关联到完成端口. 第3个参数是一个自定义数据
//在GetQueuedCompletionStatus将携带这些数据返回. 这个自定义数据将一直与此套接字绑定在了一起
CreateIoCompletionPort((HANDLE)client_socket, hComp, (DWORD)pSockData, 0);

GetQueuedCompletionStatus :一旦类似WSARecv / WSASend 完成后 . 用此函数获取结果,就想APC函数一样,一旦完成io操作就调用. 此函数一般情况都在某一个线程中使用.注意一旦在某个线程中调用了此函数,这意味着,

该线程就像被指派给了IOCP一样,供IOCP使用. 总之这个行为就想APC函数在另一个线程被调用了;

关于解除关联: 一旦一个套接字关闭了 , closehandle /closesocket. 就将从IOCP的设备句柄列表中解除关联了

关于线程:

CreateIoCompletionPort  最后一个参数用于指定IOCP最多执行N个线程(如果是0 则使用默认CPU的核数). 但一般情况下,我会预留一些额外的线程.比如

我的CPU是4核即IOCP最多可使用 4个线程 , 不过一般情况下会创建 8 个线程,给IOCP预留 额外4个线程 . 原因是如果IOCP

有5个任务已经完成, 最多只有4个线程被唤醒. 如果其中某个线程调用了WaitForSingleObject 之类的函数 ,此时IOCP将唤醒额外的线程来处理第5个任务;

先补充一下. 对于WSARecv / WSASend 的OVERLAPPED操作,简称为投递操作.意思是让操作系统去干活,至于什么时候干完.

GetQueuedCompletionStatus 会通知你(即返回) . 因此因此, 需要注意, 这些参数像WSABUF 和 OVERLAPPED 一定要 new / malloc在堆中;

代码中都有注释: 另代码中有很多返回都没判断.这个例子仅仅解释如何编写IOCP

#include "stdafx.h"
#include <process.h>
#include "../utils.h" //包含了一些宏和一些打印错误信息的函数. 
 
#define BUFFSIZE 8192
#define Read 0
#define Write 1
 
//自定义数据 .  注意 结构的地址 与 第一个成员的地址相同
struct IOData
{
    WSAOVERLAPPED overlapped;  //每个io操作都需要独立的一个overlapped
    WSABUF wsabuf;   //读写各一份
    int rw_mode;         //判断读写操作
    char * buf;              //真正存放数据的地方, 需要初始化
};
 
//自定义数据.保存客户套接字和地址
struct SocketData
{
    SOCKET hClientSocket;        //客户端套接字
    SOCKADDR_IN clientAddr;
    IOData * pRead;                    //2个指针,只是为了在线程中方便使用添加的
    IOData * pWrite;
};
 
 
//用于交换2个buf
int swapBuf(WSABUF * a, WSABUF * b)
{
    BOOL ret = FALSE;
    if (a && b){
        char * buf = a->buf;
        a->buf = b->buf;
        b->buf = buf;
        ret = TRUE;
    }
    return ret;
}
 
//释放内存,解除关联
void freeMem(SocketData * pSockData)
{
    closesocket(pSockData->hClientSocket);
    free(pSockData->pRead->buf);
    free(pSockData->pWrite->buf);
    free(pSockData->pWrite);
    free(pSockData->pRead);
    free(pSockData);
}
 
 
unsigned int WINAPI completeRoutine(void * param)
{
    //完成端口
    HANDLE hCom = (HANDLE)param;
 
    SocketData * pSockData = NULL;
    IOData * pIOData = NULL;
    DWORD flags = 0, bytes = 0; 
    BOOL ret = 0;
    SOCKET hClientSocket = NULL;
    printf("tid:%ld start!\n", GetCurrentThreadId());
 
    while (1)
    {
        flags = 0;
        
        //直到有任务完成即返回
        ret = GetQueuedCompletionStatus(hCom, &bytes,
            (PULONG_PTR)&pSockData,
            (LPOVERLAPPED * )&pIOData,
            INFINITE);
        printf("GetQueuedCompletionStatus : %d , diy key : %p , pIOData:%p,mode:%d\n", ret, pSockData,
            pIOData,pIOData->rw_mode);
 
        //如果成功了
        if (ret)
        {
            hClientSocket = pSockData->hClientSocket;
 
            //如果是WSARecv的
            if (Read == pIOData->rw_mode)
            {
                printf("READ - > bytesRecved:%ld, high:%ld\n", bytes, pIOData->overlapped.InternalHigh);
 
                //对端关闭
                if (0 == bytes)
                {
                    printf("peer closed\n");
                    freeMem(pSockData);  //释放内存
                    continue;
                } 
 
                //测试数据 
                pSockData->pRead->buf[bytes] = 0;
                printf("Read buf:%s\n", pSockData->pRead->buf);
 
                //交换指针, 把recv的buf 给 write的buf;
                //把write的buf交换给recv . 如果并发量不大的时候可以这么做
                swapBuf(&pIOData->wsabuf, &pSockData->pWrite->wsabuf);
 
                //回传操作.清空write OVERLAPPED
                memset(&pSockData->pWrite->overlapped, 0, sizeof(WSAOVERLAPPED));
                pSockData->pWrite->wsabuf.len = bytes;
                WSASend(hClientSocket, &pSockData->pWrite->wsabuf,
                    1, NULL, 0, &pSockData->pWrite->overlapped, NULL);
 
                //再次投递一个recv操作,等待下次客户端发送
                memset(&pSockData->pRead->overlapped, 0, sizeof(WSAOVERLAPPED));
                pSockData->pRead->wsabuf.len = BUFFSIZE;
                WSARecv(hClientSocket, &pSockData->pRead->wsabuf, 1, NULL, &flags,
                    &pSockData->pRead->overlapped, NULL);
            }
            else {
 
                 // send 完成.
                printf("Send finsished - > bytes:%ld, high:%ld\n", bytes, pIOData->overlapped.InternalHigh);
                memset(&pIOData->overlapped, 0, sizeof(WSAOVERLAPPED));
            }
        }
        else{
            //一旦出错, 解除绑定即删除内存
            print_error(GetLastError());
            freeMem(pSockData);
        }
    }
 
    return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
    WSADATA wsadata;
    if (WSAStartup(MAKEWORD(2, 2), &wsadata) != 0){
        print_error(WSAGetLastError());
        return 0;
    }
    SYSTEM_INFO sysinfo;
    GetSystemInfo(&sysinfo);
 
    //指定线程数量. 一般 processors * 2
    const DWORD nThreads = sysinfo.dwNumberOfProcessors * 2;
 
    //创建一个完成端口 ,  前3个参数保证了创建一个独立的完成端口, 最后一个参数指定了完成
    //端口可使用的线程数. 0 使用当前cpu核数
    HANDLE hCom = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
 
    //准备一些线程供完成端口调用, 把完成端口同时传入
    HANDLE  * arr_threads = new HANDLE[nThreads];
    for (int i = 0; i < sysinfo.dwNumberOfProcessors; ++i)
        arr_threads[i] = (HANDLE)_beginthreadex(NULL, 0, completeRoutine, (void*)hCom, 0, NULL);
 
 
    //创建一个支持OVERLAPPED的socket.这样的属性将被 accept 返回的socket所继承
    SOCKET hListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    SOCKADDR_IN serv_addr, client_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(PORT);
    serv_addr.sin_addr.s_addr = INADDR_ANY;
 
    bind(hListenSocket, (SOCKADDR*)&serv_addr, sizeof(serv_addr));
    listen(hListenSocket, BACKLOG);
 
    SOCKET client_socket;
    int client_addr_size = 0;
    DWORD flags = 0;
    while (1){
        client_addr_size = sizeof(client_addr);
        flags = 0;
        client_socket = accept(hListenSocket, (SOCKADDR*)&client_addr, &client_addr_size);
        puts("accepted");
 
        //准备一份数据, 用于保存clientsocket, addr, 以及读写指针;
        SocketData * pSockData = (SocketData *)malloc(sizeof(SocketData));
        pSockData->pRead = NULL;
        pSockData->pWrite = NULL;
        pSockData->hClientSocket = client_socket;
        memcpy(&pSockData->clientAddr, &client_addr, client_addr_size);
 
        
        //准备数据
        IOData *  pRead = (IOData *)malloc(sizeof(IOData));
        //对于OVERLAPPED,需要额外注意,清0
        memset(&pRead->overlapped, 0, sizeof(WSAOVERLAPPED));
        pRead->buf = (char *)malloc(BUFFSIZE);
        pRead->rw_mode = Read;
        pRead->wsabuf.buf = pRead->buf;
        pRead->wsabuf.len = BUFFSIZE;
        pSockData->pRead = pRead;
 
        IOData *pWrite = (IOData *)malloc(sizeof(IOData));
        pWrite->buf = (char *)malloc(BUFFSIZE);
        memset(&pWrite->overlapped, 0, sizeof(WSAOVERLAPPED));
        pWrite->rw_mode = Write;
        pWrite->wsabuf.buf = pWrite->buf;
        pWrite->wsabuf.len = BUFFSIZE;
        pSockData->pWrite = pWrite;
 
 
        //与iocp关联在一起. 注意第3个参数, 把自定义数据一起传递过去
        CreateIoCompletionPort((HANDLE)client_socket, hCom, (DWORD)pSockData, 0);
        WSARecv(client_socket, &pRead->wsabuf, 1, NULL, &flags, &pRead->overlapped, NULL);
    }
 
    return 0;
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Machine Learning in Action

Machine Learning in Action

Peter Harrington / Manning Publications / 2012-4-19 / GBP 29.99

It's been said that data is the new "dirt"—the raw material from which and on which you build the structures of the modern world. And like dirt, data can seem like a limitless, undifferentiated mass. ......一起来看看 《Machine Learning in Action》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具