Hiredis 实现 Redis 流水线

栏目: 数据库 · 发布时间: 6年前

内容简介:Hiredis 实现 Redis 流水线

Pipelining(流水线)允许 Redis 客户端一次向 Redis 发送多个命令,Redis 在接收到这些命令后,按顺序进行处理,然后将请求的处理结果一次性返回给客户端。流水线可以减少客户端与 Redis 之间的网络通信次数来提升 Redis 客户端在发送多个命令时的性能,可谓提升客户端性能的一个利器。

我们熟悉的 Python 版本的 Redis 客户端 redis-py 提供了 StrictPipeline 对象来实现流水线,使用起来很是方便,具体用法可以参考文章 《Redis 事务学习笔记》 。作为 C/C++ 版本的 Redis 客户端,hiredis 实现流水线稍显有点复杂,不过通过使用 hiredis 来实现流水线却可以更深刻了解流水线的内部实现原理。

Hiredis 提供 redisCommand() 函数来向 Redis 服务端发送命令, redisCommand() 函数的原型如下:

void *redisCommand(redisContext *c, const char *format, ...);

redisCommand() 执行后,返回一个 redisReply * 指针,指向 redisReply 结构体,该结构体包含了返回的结果信息。

redisCommand() 函数是阻塞的(是指使用阻塞版的 redisContext 对象,下文我们同样有这个假定),每调用一次,都会等待 Redis 服务端的返回,然后再继续执行程序下面的逻辑。

redisCommand() 函数的使用示例如下所示,完整的代码和编译可以参考文章 《Redis C 语言客户端 hiredis 的使用》

redisReply *reply;
reply = redisCommand(conn, "SET %s %s", "foo", "bar");
freeReplyObject(reply);

reply = redisCommand(conn, "GET %s", "foo");
printf("%s\n", reply->str);
freeReplyObject(reply);

如果我们需要向 Redis 服务端发送多次命令,如果都是使用 redisCommand() 函数来发送,那么每次发送后都得等待返回结果后才能继续下一次发送,这性能显然不是我们能接受的。Hiredis 提供了 redisAppendCommand() 函数来实现流水线的命令发送方案。

int redisAppendCommand(redisContext *c, const char *format, ...);

redisAppendCommand() 函数执行成功时返回 REDIS_OK ,失败时返回 REDIS_ERR

#define REDIS_ERR -1
#define REDIS_OK 0

redisCommand() 函数一样, redisAppendCommand() 函数在 hiredis 中也有其他变体,这里为了描述的简便,仅以 redisCommand() 函数为例说明。

redisAppendCommand() 函数执行后,并没有立刻将命令发送到 Redis 执行,而是先将命令缓存到 redisContext 对象中。那么, redisContext 对象中被缓存起来的命令什么时候会被发送出去呢?Hiredis 提供了 redisGetReply() 函数来将缓存的命令发送出去的功能。 redisGetReply() 函数的处理过程如下:

  1. 查看 结果缓冲区 是否还有结果没被取出,如果有,则取出结果后直接返回;如果没有,则执行步骤2
  2. 命令缓冲区 的所有命令发送到 Redis 处理,然后一直等待,直到有一个 Redis 的处理结果返回

上面我们提到的 redisCommand() 函数执行后可以直接获取 Redis 的返回结果,这是由于其内部先调用 redisAppendCommand() 函数,然后再调用 redisGetReply() 函数实现的。

说到这里,hiredis 实现流水线的过程就很清晰了。无论 redisCommand() 函数还是 redisAppendCommand() 函数,都会先将命令缓存起来,然后再发送到 Redis 执行。不同的是 redisCommand() 函数会马上发送命令然后取得返回结果,而 redisAppendCommand() 函数则在调用 redisGetReply() 函数才将所有命令一次性发送,并取得第一个命令的返回结果。

下面是使用 redisAppendCommand() 函数实现流水线方案的示例。

redisReply *reply;
redisAppendCommand(context,"SET foo bar");
redisAppendCommand(context,"GET foo");
redisGetReply(context,&reply); // SET命令的返回
freeReplyObject(reply);
redisGetReply(context,&reply); // GET命令的返回
freeReplyObject(reply);

值得注意的是,调用 redisAppendCommand() 函数的次数需要与调用 redisGetReply() 的次数要一致,否则会出现获取的 Redis 处理结果跟预期不一致的情况。

// 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
redisAppendCommand(conn, "get t");
// 本来想取得 set a ddd 的返回,却获取了 get t 的返回
reply = redisCommand(conn, "set a ddd");
printf("set a res: %s\n", reply->str);

输出的结果将会是 get t 命令的返回,而不是 set a ddd 命令的返回。

参考资料

  1. https://github.com/redis/hiredis
  2. https://gist.github.com/dspezia/1893378
  3. http://www.leoox.com/?p=316
  4. http://www.redis.cn/topics/pipelining.html

附:示例程序 testhiredis.c

编译:

gcc -o testhiredis testhiredis.c -L/usr/local/lib -lhiredis

执行:

./testhiredis

输出:

bar

res: OK

res: b

watch res: OK

res: OK, num: 0, type: 5

res: QUEUED, num: 0, type: 5

res: QUEUED, num: 0, type: 5

res: QUEUED, num: 0, type: 5

res: (null), num: 3, type: 2

set a res: tt

源程序:

#include <stdio.h>
#include <hiredis/hiredis.h>

int main() {
    // 阻塞 redisContext
    redisContext *conn = redisConnect("127.0.0.1", 6379);
    if (conn != NULL && conn->err) {
        printf("connection error: %s\n", conn->errstr);
        return 0;
    }

    // 使用 redisCommand 发送命令并获取返回
    redisReply *reply;
    reply = redisCommand(conn, "SET %s %s", "foo", "bar");
    freeReplyObject(reply);

    reply = redisCommand(conn, "GET %s", "foo");
    printf("%s\n", reply->str);
    freeReplyObject(reply);

    // 使用 redisAppendCommand 实现流水线
    redisAppendCommand(conn, "set a b");
    redisAppendCommand(conn,"get a");
    int r = redisGetReply(conn, (void **)&reply);
    if (r == REDIS_ERR) {
        printf("ERROR\n");
    }
    printf("res: %s\n", reply->str);
    freeReplyObject(reply);

    r = redisGetReply(conn, (void **)&reply);
    if (r == REDIS_ERR) {
        printf("ERROR\n");
    }
    printf("res: %s\n", reply->str);
    freeReplyObject(reply);

    // 使用 watch 命令监控键 a
    reply = redisCommand(conn, "watch a");
    printf("watch res: %s\n", reply->str);
    freeReplyObject(reply);

    // 事务流水线,总共5个命令
    redisAppendCommand(conn, "multi");
    redisAppendCommand(conn, "get foo");
    redisAppendCommand(conn, "set t tt");
    redisAppendCommand(conn, "set a aa");
    redisAppendCommand(conn, "exec");

    for (int i = 0; i < 5; ++i) {
        r = redisGetReply(conn, (void **)&reply);
        if (r == REDIS_ERR) {
            printf("ERROR\n");
        }
        printf("res: %s, num: %zu, type: %d\n", reply->str, reply->elements, reply->type);
        freeReplyObject(reply);
    }

    // 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
    redisAppendCommand(conn, "get t");
    // 本来想取得 set a ddd 的返回,却获取了 get t 的返回
    reply = redisCommand(conn, "set a ddd");
    printf("set a res: %s\n", reply->str);

    redisFree(conn);

    return 0;
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Haskell函数式编程基础

Haskell函数式编程基础

Simon Thompson / 科学出版社 / 2013-7-1 / 129.00

《Haskell函数式编程基础(第3版)》是一本非常优秀的Haskell函数式程序设计的入门书,各章依次介绍函数式程序设计的基本概念、编译器和解释器、函数的各种定义方式、简单程序的构造、多态和高阶函数、诸如数组和列表的结构化数据、列表上的原始递归和推理、输入输出的控制处理、类型分类与检测方法、代数数据类型、抽象数据类型、惰性计算等内容。书中包含大量的实例和习题,注重程序测试、程序证明和问题求解,易......一起来看看 《Haskell函数式编程基础》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

html转js在线工具
html转js在线工具

html转js在线工具