Redisコマンド処理サンプルのソースコード分析

王林
リリース: 2023-05-26 13:10:44
転載
1364 人が閲覧しました

この記事は、Redis 4.0.8 のコミュニティ バージョンに基づいています

Redisコマンド処理サンプルのソースコード分析

1. コマンド分析

Redis サーバーが受信したコマンド リクエストは、次のとおりです。最初にクライアント オブジェクトに格納されます。次に、querybuf 入力バッファはコマンド リクエストのさまざまなパラメータを解析し、それらをクライアント オブジェクトの argv および argc フィールドに格納します。

クライアントがコマンド リクエストを解析するためのエントリ関数は readQueryFromClient です。これはソケット データを読み取ってクライアント オブジェクトの入力バッファに格納し、関数 processInputBuffer を呼び出してコマンド リクエストを解析します。

注: インライン コマンド: Telnet セッションを使用してコマンド

void processInputBuffer(client *c) {
    ......
    //循环遍历输入缓冲区,获取命令参数,调用processMultibulkBuffer解析命令参数和长度
    while(sdslen(c->querybuf)) {
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;//处理telnet方式的内联命令
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break; //解析命令参数和长度暂存到客户端结构体中
        } else {
            serverPanic("Unknown request type");
        }
    }    
}

//解析命令参数和长度暂存到客户端结构体中
int processMultibulkBuffer(client *c) {
    //定位到行尾
    newline = strchr(c->querybuf,'\r');
    //解析命令请求参数数目,并存储在客户端对象的c->multibulklen字段
    serverAssertWithInfo(c,NULL,c->querybuf[0] == '*');
    ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
    c->multibulklen = ll;
    pos = (newline-c->querybuf)+2;//记录已解析命令的请求长度resp的长度
    /* Setup argv array on client structure */
    //分配请求参数存储空间
    c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
    
    // 开始循环解析每个请求参数
    while(c->multibulklen) {
        ......
        newline = strchr(c->querybuf+pos,'\r');
        if (c->querybuf[pos] != '$') {
            return C_ERR;
        ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
        pos += newline-(c->querybuf+pos)+2;
        c->bulklen = ll;//字符串参数长度暂存在客户端对象的bulklen字段
        
        //读取该长度的参数内容,并创建字符串对象,同时更新待解析参数multibulklen
        c->argv[c->argc++] =createStringObject(c->querybuf+pos,c->bulklen);
        pos += c->bulklen+2;
        c->multibulklen--;
    }
ログイン後にコピー

2 を入力します。コマンド呼び出し

multibulklen の値が 0 に更新されると、パラメータの解析が完了しました。コマンドを処理するために processCommand の呼び出しを開始します。コマンドを処理する前に、次のように多くの検証ロジックがあります:

void processInputBuffer(client *c) {
    
    ......
     //调用processCommand来处理命令
     if (processCommand(c) == C_OK) {
         ......
     }
}

//处理命令函数
int processCommand(client *c) {
    //校验是否是quit命令
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }
    //调用lookupCommand,查看该命令是否存在
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        flagTransaction(c);
        addReplyErrorFormat(c,"unknown command '%s'",
            (char*)c->argv[0]->ptr);
        return C_OK;
    //检查用户权限
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
    {
        addReply(c,shared.noautherr);
    //还有很多检查,不一一列举,比如集群/持久化/复制等
    /* 真正执行命令 */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        queueMultiCommand(c);
        //将结果写入outbuffer
        addReply(c,shared.queued);
    } 
// 调用execCommand执行命令
void execCommand(client *c) {
    call(c,CMD_CALL_FULL);//调用call执行命令
//调用execCommand调用call执行命令
void call(client *c, int flags) {
    start = ustime();
    c->cmd->proc(c);//执行命令
    duration = ustime()-start;
    //如果是慢查询,记录慢查询
    if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
        char *latency_event = (c->cmd->flags & CMD_FAST) ?
                              "fast-command" : "command";
        latencyAddSampleIfNeeded(latency_event,duration/1000);
        //记录到慢日志中
        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
    //更新统计信息:当前命令执行时间和调用次数
    if (flags & CMD_CALL_STATS) {
        c->lastcmd->microseconds += duration;
        c->lastcmd->calls++;
ログイン後にコピー

3. 結果を返す

Redis によって返された結果はクライアントに直接返されず、最初に書き込まれます。 出力バッファ (buf フィールド) または出力リンク リスト (応答フィールド) を入力します。

int processCommand(client *c) {
    ......
    //将结果写入outbuffer
    addReply(c,shared.queued);
    ......
    
}
//将结果写入outbuffer
void addReply(client *c, robj *obj) {
    //调用listAddNodeHead将客户端添加到服务端结构体的client_pending_write链表,以便后续能快速查找出哪些客户端有数据需要发送
    if (prepareClientToWrite(c) != C_OK) return;
    
    //然后添加字符串到输出缓冲区
    if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
        //如果添加失败,则添加到输出链表中
        _addReplyObjectToList(c,obj); 
}
ログイン後にコピー

addReply 関数は、クライアントに送信されるデータを一時的に保存するだけです。リンク リストまたは出力バッファを出力すると、データはいつクライアントに送信されますか?答えは、イベント ループがオンになったときに呼び出される beforesleep 関数です。この関数は、特に、期限切れのキーの削除、コマンド応答をクライアントに返すなど、それほど時間のかからないいくつかの操作を実行します。

void beforeSleep(struct aeEventLoop *eventLoop) {
    ......
     /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();
}

//回复客户端命令函数
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);
        /* 发送客户端数据 */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;
        /* If there is nothing left, do nothing. Otherwise install
         * the write handler. */
         //如果数据量很大,一次性没有发送完成,则进行添加文件事件,监听当前客户端socket文件描述符的可写事件即可
        if (clientHasPendingReplies(c) &&
            aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
                sendReplyToClient, c) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    return processed;
ログイン後にコピー

以上がRedisコマンド処理サンプルのソースコード分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:yisu.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート