发布在即!企业 AIGC 应用程度测评,3 步定制专属评估报告。抢首批测评权益>>> 了解详情
写点什么

“消失”的阻塞时间

  • 2019-09-22
  • 本文字数:7356 字

    阅读完需:约 24 分钟

“消失”的阻塞时间

1. 引言

偶然间发现 redis 的 BLPOP 命令的阻塞时间不是精确的,并且一般都是超出 100~500ms,多阻塞几次岂不是 1s 就没了?!这一行为成功引起了我的兴趣,于是乎就有了这篇对阻塞命令源码的探究。



本文以 BLPOP 命令为例探究一波内部实现(源码大军即将到来!!!)。

2. 设置阻塞 key

首先让我们跳过客户端连接和服务器初始化的步骤,直接看命令的执行流程。BLPOP 和 BRPOP 最终都会调用同一个通用函数 blockingPopGenericCommand。


在该函数中,如果 key 对应的 list 有值,阻塞命令的行为就同普通 pop 命令一致。反之,且 client 不在事务中时,就会调用 blockForKeys 函数进行设置阻塞操作。


在解析设置阻塞 key 的源码之前,需要先补充一点 redis 基础数据结构和单机实现的一些知识。


首先 redis 对外提供的 5 种数据结构实际上是由 redisObject 持有不同的底层数据结构实现的,本文会涉及到的有 dict、adlist 和 sds。其中 dict 是一个 hashTable 的封装,查找的时间复杂度为 O(1);adlist 是一个简单的双向链表,可以很方便的进行首尾遍历;sds 是 redis 自己实现的字符串,有动态扩容、兼容二进制数据等优势,基础数据结构的具体细节在这就不赘述。


除了这些基础的数据结构,redis 还定义了 redisServer、client 和 redisDb3 个数据结构用于保存 redis 服务、客户端和 db 的相关信息。一个 redis 服务只有一个 redisServer 实例,一个服务持有多个 redisDb 实例,而每个连接上的客户端都会在服务端初始化一个 client 实例,并且映射到一个 redisDb 实例上。


回到源码,redisServer、redisDb 和 client 的结构体为设置阻塞 key 专门准备了对应属性:


 1struct redisServer {    // 服务端 2    // ... 3    unsigned int bpop_blocked_clients; // 阻塞的client数量 4    // ... 5}; 6 7typedef struct redisDb {    // redisDb 8    // ... 9    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */10    // ...11} redisDb;1213typedef struct client { // 客户端14    // ...15    int btype;              /* Type of blocking op if CLIENT_BLOCKED. */16    blockingState bpop;     /* blocking state */17    // ... 18} client;1920typedef struct blockingState {21    mstime_t timeout;       // 阻塞超时时间22    dict *keys;             // 阻塞的key字典23    robj *target;           /* The key that should receive the element,24                             * for BRPOPLPUSH. */25    /* BLOCKED_WAIT */26    int numreplicas;        /* Number of replicas we are waiting for ACK. */27    long long reploffset;   /* Replication offset to reach. */28} blockingState;
复制代码


redisServer 中的 bpop_blocked_clients 只是简单的记录了阻塞的 client 数量,主要用于 info 的展示。


redisDb 中的 blocking_keys 是一个 dict,该 dict 的 key 为当前 db 阻塞的 key,每个 value 应着一个 client 的 adlist,如果多个 client 阻塞着同一个 key 会按照先后顺序添加到 adlist 尾部。


client 的 byte 标识了阻塞的类型,用于区分 BLPOP 和 BRPOPLPUSH 两种阻塞。bpop 记录了阻塞的各种属性,最主要的是 timeout 和 keys。


终于扯的差不多,可以把视线就转移到 blockForKeys 的源码了:


 1void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {    // 阻塞key 用于阻塞命令 2    dictEntry *de; 3    list *l; 4    int j; 5 6    c->bpop.timeout = timeout;  // 设置阻塞时间 7    c->bpop.target = target; 8 9    if (target != NULL) incrRefCount(target);1011    for (j = 0; j < numkeys; j++) {12        if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;    // 将阻塞的key添加到client的bpop.keys字典中13        incrRefCount(keys[j]); // 引用计数加一14        de = dictFind(c->db->blocking_keys,keys[j]);    // 查找db的blocking_keys中是否存在阻塞的key15        if (de == NULL) {16            int retval;17            l = listCreate();   // 创建一个adlist实例18            retval = dictAdd(c->db->blocking_keys,keys[j],l);   // 添加到blocking_keys的字典中19            incrRefCount(keys[j]);20            serverAssertWithInfo(c,keys[j],retval == DICT_OK);21        } else {22            l = dictGetVal(de);23        }24        listAddNodeTail(l,c);   // 在blocking_keys的对应list中添加上client25    }26    blockClient(c,BLOCKED_LIST);    // 设置客户端阻塞状态27}2829void blockClient(client *c, int btype) {30    c->flags |= CLIENT_BLOCKED;31    c->btype = btype;32    server.bpop_blocked_clients++;33}
复制代码


c->bpop.keys 是一个 value 为 NULL 的 dict,主要是防止阻塞的 key 重复。在设置完阻塞 key 后,client 的状态会变更为 CLIENT_BLOCKED 状态,在这种状态下正常弹出、超时或客户端断开才会断开连接。

3. 阻塞超时

蛤!?设置流程完成了,却完全没看到超时的处理步骤,看来只能跟随着 bpop.timeout 的脚步进行下一步排查了。万幸在 clientsCronHandleTimeout 函数中发现了蛛丝马迹。那么 clientsCronHandleTimeout 又是搞啥子的呢?还是要先补充一些 redis 单机的运行知识。


首先如果不考虑 BGSAVE 和 BGREWRITEAOF 的话,redis 实际上是一个单进程的服务。通过 IO 多路复用和事件循环同时处理多 client 的请求。


事件循环中的事件分为文件事件和时间事件,文件事件主要是各种网络请求的处理,时间事件是 redis 服务定期执行的一些事件,包括随机删除过期 key、rdbsave、主从同步等,时间事件的执行频率是根据配置中的 hz 参数来设定。


而 clientsCronHandleTimeout 就是时间事件中专门检测 client 超时的函数,如果当前 client 的状态为 CLIENT_BLOCKED 会就根据 bpop.timeout 进行超时判断:


 1int clientsCronHandleTimeout(client *c, mstime_t now_ms) {  // 校验客户端是否超时 2    time_t now = now_ms/1000; 3 4    if (server.maxidletime && 5        !(c->flags & CLIENT_SLAVE) &&    /* no timeout for slaves */ 6        !(c->flags & CLIENT_MASTER) &&   /* no timeout for masters */ 7        !(c->flags & CLIENT_BLOCKED) &&  /* no timeout for BLPOP */ 8        !(c->flags & CLIENT_PUBSUB) &&   /* no timeout for Pub/Sub clients */ 9        (now - c->lastinteraction > server.maxidletime))10    {   // 正常设置的client超时11        serverLog(LL_VERBOSE,"Closing idle client");12        freeClient(c);13        return 1;14    } else if (c->flags & CLIENT_BLOCKED) { // 阻塞超时15        if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {16            /* Handle blocking operation specific timeout. */17            replyToBlockedClientTimedOut(c);    // 设置超时返回18            unblockClient(c);   // 解除阻塞19        } else if (server.cluster_enabled) {    // 集群相关操作20            if (clusterRedirectBlockedClientIfNeeded(c))21                unblockClient(c);22        }23    }24    return 0;25}
复制代码


刨除一系列的校验,最核心的是 unblockClient 这个函数:


 1void unblockClient(client *c) { // 解除阻塞 2    if (c->btype == BLOCKED_LIST) {    // BLPOP和BRPOP的阻塞 3        unblockClientWaitingData(c); 4    } else if (c->btype == BLOCKED_WAIT) { 5        unblockClientWaitingReplicas(c); 6    } else { 7        serverPanic("Unknown btype in unblockClient()."); 8    } 910    c->flags &= ~CLIENT_BLOCKED;11    c->btype = BLOCKED_NONE;12    server.bpop_blocked_clients--;13    if (!(c->flags & CLIENT_UNBLOCKED)) {   // 将client添加到server的非阻塞list中14        c->flags |= CLIENT_UNBLOCKED;15        listAddNodeTail(server.unblocked_clients,c);16    }17}1819void unblockClientWaitingData(client *c) {  // 解除client的阻塞20    dictEntry *de;21    dictIterator *di;22    list *l;2324    serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);25    di = dictGetIterator(c->bpop.keys); // 获取阻塞的keys26    while((de = dictNext(di)) != NULL) {    // 遍历client阻塞的所有key27        robj *key = dictGetKey(de);2829        l = dictFetchValue(c->db->blocking_keys,key);30        serverAssertWithInfo(c,key,l != NULL);31        listDelNode(l,listSearchKey(l,c));  // 删除blocking_keys list中的client32        if (listLength(l) == 0) // 删除空list33            dictDelete(c->db->blocking_keys,key);34    }35    dictReleaseIterator(di);3637    dictEmpty(c->bpop.keys,NULL);   // 释放client的bpop.keys占用内存38    if (c->bpop.target) {39        decrRefCount(c->bpop.target);40        c->bpop.target = NULL;41    }42}
复制代码


redisServer 的 unblocked_clients 底层为一个 client 的 adlist。每个超时的 client 都会被记录在这个 adlist 中;超时的 client 中的 bpoop.keys 会被清空,同时 db 中的 blocking_keys 对应的 client 也会被删除。


好吧,流程执行到这又断了!还好有 server.unblocked_clients 作为线索,beforeSleep 作为其“包庇者”,会检测 server.unblocked_clients 是否为空,如果存在就给对应 client 返回超时并删除 list 中对应的 client:


 1void beforeSleep(struct aeEventLoop *eventLoop) {   // 每一次事件循环都会执行该函数 2    // ...     3    if (listLength(server.unblocked_clients)) 4        processUnblockedClients(); 5    // ... 6} 7 8void processUnblockedClients(void) {    // 向非阻塞状态的client返回值 9    listNode *ln;10    client *c;1112    while (listLength(server.unblocked_clients)) {13        ln = listFirst(server.unblocked_clients);14        serverAssert(ln != NULL);15        c = ln->value;16        listDelNode(server.unblocked_clients,ln);   // 删除list节点17        c->flags &= ~CLIENT_UNBLOCKED;1819        if (!(c->flags & CLIENT_BLOCKED)) {20            if (c->querybuf && sdslen(c->querybuf) > 0) {21                processInputBuffer(c);  // 向client返回结果22            }23        }24    }25}
复制代码


到这整个超时流程已经走完了,那么消失的几百毫秒去哪了呢?


实际上 beforeSleep 函数是每次事件循环之前都会调用一个函数,因为设置阻塞、阻塞超时检测、阻塞超时返回分别分散在不同的事件中,所以阻塞的超时返回很可能需要跨越事件循环。而 redis 的默认 hz 配置为 10,每个周期间隔为 100ms,至此终于是水落石出。

4. 正常弹出

既然都追查到这了,那就继续看看正常弹出的策略吧。当有另一个 client 对阻塞的 key 进行 push 操作时,阻塞的 client 就会被正常弹出并获取返回值。该监测操作在添加数据的基础函数 dbAdd 中:


1void dbAdd(redisDb *db, robj *key, robj *val) { // db添加键值对2    sds copy = sdsdup(key->ptr);    // 复制key3    int retval = dictAdd(db->dict, copy, val);  // 往字典中添加键值对45    serverAssertWithInfo(NULL,key,retval == DICT_OK);6    if (val->type == OBJ_LIST) signalListAsReady(db, key);  // 如果是list对象 判断是否有阻塞命令在监听7    if (server.cluster_enabled) slotToKeyAdd(key);  // 集群相关操作8 }如果添加的key是list对象,并且处于阻塞中,signalListAsReady就会标识该key已经就绪。
1typedef struct readyList { // server的待弹出list节点 2 redisDb *db; 3 robj *key; 4} readyList; 5 6void signalListAsReady(redisDb *db, robj *key) { // 标识某个阻塞key已经就绪 7 readyList *rl; 8 9 if (dictFind(db->blocking_keys,key) == NULL) return; // 不在阻塞的key中1011 if (dictFind(db->ready_keys,key) != NULL) return; // 防止重复弹出1213 rl = zmalloc(sizeof(*rl));14 rl->key = key;15 rl->db = db;16 incrRefCount(key);17 listAddNodeTail(server.ready_keys,rl); // 将ready_list添加到server.ready_keys的尾部1819 incrRefCount(key);20 serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); // 将key添加到db的ready_keys字典中21}
复制代码


redisServer 的 ready_keys 是一个 adlist,每个节点都是一个 readyList 对象,保存了 key 和对应的 db。


redisDb 的 ready_keys 是一个 dict 实例,同 client->bpop.keys 一样,value 都是 NULL,主要是利用 dictO(1)高效查找。


在最终的阻塞返回是在 handleClientsBlockedOnLists 函数中,该函数会在每次命令执行完成后被调用:


 1int processCommand(client *c) { // 执行命令 2    // ... 3    /* Exec the command */ 4    if (c->flags & CLIENT_MULTI && 5        c->cmd->proc != execCommand && c->cmd->proc != discardCommand && 6        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) 7    {   // 事务 8        queueMultiCommand(c); 9        addReply(c,shared.queued);10    } else {    // 普通命令11        call(c,CMD_CALL_FULL);  // 调用命令对应函数12        c->woff = server.master_repl_offset;13        if (listLength(server.ready_keys))14            handleClientsBlockedOnLists();  // 处理阻塞key15    }16    return C_OK;17}
复制代码


通过在每次命令完成后对 ready_keys 进行检测,从而保证阻塞的 key 不会被其他的 pop 命令弹出。


 1void handleClientsBlockedOnLists(void) {    // 处理list阻塞弹出返回 2    while(listLength(server.ready_keys) != 0) { 3        list *l; 4 5        l = server.ready_keys; 6        server.ready_keys = listCreate();   // 将ready_keys置空 7 8        while(listLength(l) != 0) { // 遍历server.ready_keys 9            listNode *ln = listFirst(l);10            readyList *rl = ln->value;1112            dictDelete(rl->db->ready_keys,rl->key); // 清除db->ready_keys1314            robj *o = lookupKeyWrite(rl->db,rl->key);   // 查找对应的key15            if (o != NULL && o->type == OBJ_LIST) {16                dictEntry *de;1718                de = dictFind(rl->db->blocking_keys,rl->key);   // 查找db的blocking_keys 获取阻塞client list19                if (de) {20                    list *clients = dictGetVal(de);21                    int numclients = listLength(clients);2223                    while(numclients--) {   // 遍历clients 如果有足够多的值就都弹出返回24                        listNode *clientnode = listFirst(clients);25                        client *receiver = clientnode->value;   // 获取client对象26                        robj *dstkey = receiver->bpop.target;27                        int where = (receiver->lastcmd &&28                                     receiver->lastcmd->proc == blpopCommand) ?29                                    LIST_HEAD : LIST_TAIL;30                        robj *value = listTypePop(o,where); // pop对应list3132                        if (value) {33                            if (dstkey) incrRefCount(dstkey);34                            unblockClient(receiver);    // 解除client的阻塞3536                            if (serveClientBlockedOnList(receiver,37                                rl->key,dstkey,rl->db,value,38                                where) == C_ERR)39                            {   // 如果通知客户端出错就进行回滚,将数据push回list40                                    listTypePush(o,value,where);41                            }4243                            if (dstkey) decrRefCount(dstkey);44                            decrRefCount(value);45                        } else {46                            break;47                        }48                    }49                }50                if (listTypeLength(o) == 0) {51                    dbDelete(rl->db,rl->key);52                }53            }54            // 释放临时变量55            decrRefCount(rl->key);56            zfree(rl);57            listDelNode(l,ln);58        }59        listRelease(l); /* We have the new list on place at this point. */60    }61}
复制代码

5. 小结

redis 是一个单进程服务,通过 IO 多路复用和事件循环提供高并发的服务,因此阻塞命令的实现需要分散到整个事件循环的各个环节中。


检测阻塞超时和返回超时信息都是异步进行的,很可能需要跨越事件循环,redis 默认每个事件循环的间隔为 100ms,这就是导致多出几百 ms 的“元凶”。


redis 每次进行添加操作的时候都会检测是否为阻塞的 list,并且在每次命令执行完成后都会对待弹出 key 进行弹出,保证阻塞命令拥有最高的弹出优先级。


redisServer 持有多个 redisDb,每个 redisDb 又可能对应多个 client,因此每个结构都需要持有不同的阻塞相关属性,用于各个环节的检测和返回。


最后附上一个阻塞命令涉及到的数据结构图:



作者介绍:


吴超艺,新房研发部,16 年 11 月加入贝壳找房,任职 PHP 研发工程师。


本文转载自公众号贝壳产品技术(ID:gh_9afeb423f390)。


原文链接:


https://mp.weixin.qq.com/s/ax1GP_fu8fHs9fNXMQKVyw


2019-09-22 23:42844

评论

发布
暂无评论
发现更多内容
“消失”的阻塞时间_文化 & 方法_吴超艺_InfoQ精选文章