Redis 5.0 Stream消息队列会每次唤醒所有同一个group内的消费者

栏目: 数据库 · 发布时间: 5年前

Redis stream支持多个消费者在同一个group里面消费,并且一条消息只会发送给一个消费者,不会被同组内的多个consumer获取。但有些特殊情况也不得不注意。

而当所有消费者都BLOCK在一个组内的时候,如果生产者生产一条消息,正常对于kafka来说不会唤醒其他消费者的,而 redis 目前的实现在这里,会将所有消费者都唤醒,只是只有第一个订阅者才能获取到这条消息,这点在使用的时候需要格外注意,性能也有问题,希望以后redis能改善这情况了。

下面详细说一下出现这种情况的原因。

0. handleClientsBlockedOnKeys 扫描订阅的消费者列表尝试发送消息

在之前的文章 Redis 5.0 重量级特性 Stream 实现源码分析(二)XREAD 消费流程 里面说过,所有消费者阻塞在一个队列上后,如果有新元素插入,就会触发到handleClientsBlockedOnKeys()函数。 函数会遍历所有等待在这个key上面的客户端列表,按照先后顺序看是否要给他发送消息。

list *clients = dictGetVal(de);
    listNode *ln;
    listIter li;
    listRewind(clients,&li);

    //下面循环所有等待的客户端列表,一个个判断他们是否跟当前key匹配,如果id等都合适,
    //那么调用streamReplyWithRange 去扫描发送消息。同时,如果有多个consumer等待在一个key上的情况
    //怎么处理呢? 不会发生重复吗? 答案当然不会,因为streamReplyWithRange每次回更新最大的last_id,
    //然后下回进来的时候第二个消费者其实不会有什么实际的操作发生
    while((ln = listNext(&li))) {
        client *receiver = listNodeValue(ln);
        //一个客户端可能block在多个key上? 是的
        //每个客户端的bpop结构里面记录了我都阻塞在了哪些key里面
        //但是,同时只能等待在某一类key上面,不能是多种。 因为blockClient 里面会重置btype, 且等待状态会拒绝其他请求
        if (receiver->btype != BLOCKED_STREAM) continue;
        //从这个客户端的等待的key列表里面找到这个key,然后比较id是否有新的,如果有新的就发送数据并且解开等待状态
        streamID *gt = dictFetchValue(receiver->bpop.keys, rl->key);
        if (s->last_id.ms > gt->ms ||
            (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
        {
            //、、、
                        //调用函数给这个group和consumer 发送指定位置的信息
            streamReplyWithRange(receiver,s,&start,NULL,
                                 receiver->bpop.xread_count,
                                 0, group, consumer, 0, &pi);

        }

每次循环的最后,就是调用streamReplyWithRange 函数发送该消费者指定的起始位置往后的消息。这里就得问了,那同组不重复的逻辑在哪实现的呢?

答案是在streamReplyWithRange里面,给客户端找消息之前,会判断当前客户端要求的id位置是不是比所属group的最大id要小,如果要小,就不会将后面的新消息发送给这个消费者。

1. streamReplyWithRange 扫描新消息,去重

来看一下streamReplyWithRange前面部分,对于如果指定了group消费, 参数ID如果小于这个群组的最大id的时候,是什么情况呢?

答案是要么这个客户端要求消费旧消息,要么是因为同时等待,然后第一个消费了后,后面的消费者其实都遇到这种情况。

size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamC
onsumer *consumer, int flags, streamPropInfo *spi) {
    //根据group和consumer参数,读取start到end的最多count个元素,可以反向读取
    void *arraylen_ptr = NULL;
    if (group && streamCompareID(start,&group->last_id) <= 0) {
        //这是个旧id,那么从待完成队列里面去找一下消息然后触发他们,不能发错了
        return streamReplyWithRangeFromConsumerPEL代码。

2. streamReplyWithRangeFromConsumerPEL 发送PEL旧消息

来看一下 streamReplyWithRangeFromConsumerPEL代码, 注意下面的注释。

/* This is an helper function for streamReplyWithRange() when called with
 * group and consumer arguments, but with a range that is referring to already
 * delivered messages. In this case we just emit messages that are already
 * in the history of the conusmer, fetching the IDs from its PEL.
 *
 * Note that this function does not have a  rev  argument because it s not
 * possible to iterate in reverse using a group. Basically this function
 * is only called as a result of the XREADGROUP command.
 *
 * This function is more expensive because it needs to inspect the PEL and then
 * seek into the radix tree of the messages in order to emit the full message
 * to the client. However clients only reach this code path when they are
 * fetching the history of already retrieved messages, which is rare. */
//上面作者说这个函数很少进来,只有客户端获取旧id消息的时候调用。
//但是还有一种情况,多个消费者等待同一个id的时候,除了第一个之外其他都会重复投递吧
//然后这个函数进行重试的时候,就会将所有客户端全唤醒,返回空
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
    //streamReplyWithRange 调用这里,由于客户端要求的ID比group的最大id小
    //为了避免一个ID发给多个人,所以这种时候调用这里来将这个consumer的待确认队列里面的内容全部发送出去
    //什么意思呢?就当他要求重发他的历史数据了。
    raxIterator ri;
    unsigned char startkey[sizeof(streamID)];
    unsigned char endkey[sizeof(streamID)];
    streamEncodeID(startkey,start);
    if (end) streamEncodeID(endkey,end);

    size_t arraylen = 0;
    void *arraylen_ptr = addDeferredMultiBulkLength(c);
    raxStart(&ri,consumer->pel);
    raxSeek(&ri,">=",startkey,sizeof(startkey));
    while(raxNext(&ri) && (!count || arraylen < count)) {
        if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
        streamID thisid;
        streamDecodeID(ri.key,&thisid);
        if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL,
                                 STREAM_RWR_RAWENTRIES,NULL) == 0)
        {
            /* Note that we may have a not acknowledged entry in the PEL
             * about a message that s no longer here because was removed
             * by the user by other means. In that case we signal it emitting
             * the ID but then a NULL entry for the fields. */
            addReplyMultiBulkLen(c,2);
            streamID id;
            streamDecodeID(ri.key,&id);
            addReplyStreamID(c,&id);
            addReply(c,shared.nullmultibulk);
        } else {
            streamNACK *nack = ri.data;
            nack->delivery_time = mstime();
            nack->delivery_count++;
        }
        arraylen++;
    }
    raxStop(&ri);
    setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
    return arraylen;
}

streamReplyWithRange 调用streamReplyWithRangeFromConsumerPEL,由于客户端要求的ID比group的最大id小, 为了避免一个ID发给多个人,所以这种时候调用这里来将这个consumer的待确认队列里面的内容全部发送出去。

什么意思呢?就当他要求重发他的历史数据了!

看上面的英文注释,上面作者说这个函数很少进来,只有消费者获取旧id消息的时候调用。但是还有一种情况,多个消费者等待同一个id的时候,除了第一个之外其他都会重复投递吧? 然后这个函数进行重试的时候,就会将所有消费者全唤醒,后面的消费者返回空!

注意上面raxSeek 是从客户端要求的startid开始,这样 多消费者在group内消费的时候,id会不断加大,所以不可能存在PEL消息,比消费者要求的消息还小。也就是说,下面的while进不去。

但是呢,addDeferredMultiBulkLength 已经调用了,会导致给客户端发送空消息的,这样客户端就收到一个空数据包被唤醒,又得继续BLOCK 才行。适用方会很郁闷,来生产一条消息所有消费者都唤醒~

这种情况其实挺耗性能的。

3. 复现测试样例

再举个例子:

C1, C2 消费者消费同一个stream, 在一个group内。 按照如下顺序处理。

C1 C2 P1
XREADGROUP GROUP mygroup C1 count 10 block 10000000 streams streamname1 >
XREADGROUP GROUP mygroup C2 count 10 block 10000000 streams streamname1 >
阻塞 阻塞
XADD streamname1 * testf1 v1
唤醒,获得消息testf1 唤醒,获得空消息
C1需要重新XREADGROUP C2也需要重新XREADGROUP

由于目前这种机制存在,类似于惊群效应,所以其实对网络的消耗,以及消费者频繁被唤醒后还需要再次发送 XREADGROUP BLOCK 去等待消息,性能消耗可能是需要考虑的问题。使用的时候得注意。

Redis 5.0 Stream消息队列会每次唤醒所有同一个group内的消费者

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Effective JavaScript

Effective JavaScript

David Herman / Addison-Wesley Professional / 2012-12-6 / USD 39.99

"It's uncommon to have a programming language wonk who can speak in such comfortable and friendly language as David does. His walk through the syntax and semantics of JavaScript is both charming and h......一起来看看 《Effective JavaScript》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

随机密码生成器
随机密码生成器

多种字符组合密码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码