Redis 6.0 IO线程功能分析

栏目: IT技术 · 发布时间: 3年前

内容简介:从上图可知,

Redis多线程原理

Redis 6.0 的亮点之一就是支持多线程,Redis 分  主线程 和  IO线程IO线程 只用于读取客户端的命令和发送回复数据给客户端,处理客户端命令还是在  主线程 进行,如下图所示:

Redis 6.0 IO线程功能分析

从上图可知, 主线程 主要负责接收客户端连接,并且分发到各个  IO线程 ,而  IO线程 负责读取客户端命令。命令读取完成后,由  主线程 执行命令。 主线程 执行完命令后,再由  IO线程 把回复数据发送给客户端。

读者可能会问,为什么处理命令不在 IO线程 进行,我觉得主要有两个原因:

  • 如果处理命令在 IO线程 进行,那么就会涉及到竞争的问题。因为 Redis 的数据库是共享的,所以如果多个线程同时操作数据库,那么就必须要对数据库进行上锁,而上锁是一个比较耗时的操作(因为上锁可能会导致线程上下文切换)。

  • 由于 Redis 6.0 以前一直都是由单线程执行命令的,所以如果要改为多线程执行命令,那么需要修改大量代码,而且可能会引入新的问题(比如bug)。所以,为了稳定性,继续使用单线程执行命令是最好的选择。

为什么要使用多线程呢?主要为了使用多核CPU的优势,下面是使用多线程的测试数据(数据来源网络):

Redis 6.0 IO线程功能分析

Redis 6.0 IO线程功能分析

从上面的测试结果可以看出,多线程版本的 Redis 读写QPS都要比单线程版本的高。

Redis 多线程实现

要开启 Redis 的 IO线程 功能,可以在配置文件中加入以下配置项:

io-threads-do-reads yes     # 开启IO线程
io-threads 6                # 设置IO线程数

Redis 在启动时会根据配置文件中设置的 IO线程 数来启动  IO线程 ,启动  IO线程 在函数  initThreadedIO() 中完成,代码如下:

void initThreadedIO(void) {
    io_threads_active = 0;
    
    if (server.io_threads_num == 1) return;
    ...
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

initThreadedIO() 函数的主要工作是:

  • 为每个IO线程创建一个链表,用于放置要进行IO操作的客户端连接。

  • 为每个IO线程创建一个锁,用于主线程与IO线程的通信。

  • 调用 pthread_create() 系统调用来创建IO线程,IO线程的主体函数是  IOThreadMain()

下面我们来分析一下IO线程的主体函数主要完成的工作:

void *IOThreadMain(void *myid) {
    long id = (unsigned long)myid;
    ...
    while (1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        if (io_threads_pending[id] == 0) { // 不等于0表示有客户端连接需要处理
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
        ...
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;
        ...
    }
}

IO线程的主体函数主要完成以下几个操作:

  • 等待主线程分配客户端连接(对应IO线程的 io_threads_list 链表不为空)。

  • 判断当前是进行读操作还是写操作( io_threads_op  等于  IO_THREADS_OP_WRITE  表示要进行写操作,而  io_threads_op  等于  IO_THREADS_OP_READ  表示要进行读操作)。 如果是进行写操作,那么就调用  writeToClient()  函数向客户端连接进行发送数据。 如果是读操作,那么调用  readQueryFromClient()  函数读取客户端连接的请求。

  • 完成对客户端连接的读写操作后,需要清空对应IO线程的 io_threads_list 链表和计数器  io_threads_pending ,用于通知主线程已经完成读写操作。

那么,主线程是怎样分配客户端连接给各个IO线程的呢?

主线程在接收到客户端连接后,会把客户端连接添加到事件驱动库中监听其读事件,读事件的回调函数为 readQueryFromClient() 。也就是说,当客户端连接可读时会触发调用  readQueryFromClient() 函数,而  readQueryFromClient() 函数会调用  postponeClientRead() 函数判断当前 Redis 是否开启了  IO线程 功能,代码如下:

int postponeClientRead(client *c) {
    if (io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

postponeClientRead() 函数主要判断 Redis 是否开启了  IO线程 功能,如果开启了就调用  listAddNodeHead() 函数把客户端连接添加到  clients_pending_read 链表中,并且设置客户端连接的  CLIENT_PENDING_READ 标志位,表示当前连接已经在  clients_pending_read 链表中,防止二次添加。

把客户端连接添加到 clients_pending_read 链表后,主线程会在  handleClientsWithPendingReadsUsingThreads() 函数中把客户端连接分配给各个  IO线程 。代码如下:

int handleClientsWithPendingReadsUsingThreads(void) {
    ...
    /* 分配给各个IO线程 */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 设置各个IO线程负责的客户端连接数
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主线程也要负责一部分客户端连接的读写操作
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    // 等待所有IO线程完成
    while (1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    ...
    // 执行各个客户端连接的命令
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);

        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~CLIENT_PENDING_COMMAND;
            if (processCommandAndResetClient(c) == C_ERR) {
                continue;
            }
        }
        processInputBuffer(c);
    }
    return processed;
}

handleClientsWithPendingReadsUsingThreads() 函数主要完成以下几个操作:

  • 分配客户端连接给各个 IO线程 (添加到对应  IO线程 的  io_threads_list 链表中),分配策略为轮询。

  • 设置各个 IO线程 负责的客户端连接数  io_threads_pending

  • 处理主线程负责那部分客户端连接的读写操作。

  • 等待所有 IO线程 完成读取客户端连接请求的命令。

  • 执行各个客户端连接请求的命令。

前面说过, IO线程 在完成读取客户端连接的请求后,会把  io_threads_pending 计数器清零,主线程就是通过检测  io_threads_pending 计数器来判断是否所有  IO线程 都完成了对客户端连接的读取命令操作。

但这里要吐槽一下的是,在等待 IO线程 读取客户端请求时,居然用了一个死循环来等待,这样有可能会导致CPU使用率飙升的问题,有可能影响其他服务的运行(不知道作者怎么想的)。我觉得比较合适的方式是,各个  IO线程 完成了读取命令操作后,通过一个信号来通知主线程。


以上所述就是小编给大家介绍的《Redis 6.0 IO线程功能分析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

国际大学生程序设计竞赛例题解

国际大学生程序设计竞赛例题解

郭嵩山 / 电子工业出版社 / 2006-5 / 32.0

《国际大学生程序设计竞赛例题解1:数论、计算几何、搜索算法专集》可以作为高等院校有关专业的研究生和本科学生参加国际大学生程序设计竞赛的辅导教材,也可作为高等院校有关专业相关课程的教材和教学参考书,也比较适合作为中学青少年信息学奥林匹克竞赛省级及省级以上优秀选手备战信息学奥林匹克竞赛的培训教材及训练题集。一起来看看 《国际大学生程序设计竞赛例题解》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具