Redis指令處理過程實例原始碼分析
本文基於社群版Redis 4.0.8
1、命令解析
Redis伺服器接收到的命令請求首先儲存在客戶端物件的querybuf輸入緩衝區,然後解析指令請求的各個參數,並儲存在客戶端物件的argv和argc欄位。
客戶端解析指令請求的入口函數為readQueryFromClient,會讀取socket資料儲存到客戶端物件的輸入緩衝區,並呼叫函數processInputBuffer解析命令請求。
註:內嵌指令:使用telnet會話輸入指令的方式
void processInputBuffer(client *c) { ...... //循环遍历输入缓冲区,获取命令参数,调用processMultibulkBuffer解析命令参数和长度 while(sdslen(c->querybuf)) { if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break;//处理telnet方式的内联命令 } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; //解析命令参数和长度暂存到客户端结构体中 } else { serverPanic("Unknown request type"); } } } //解析命令参数和长度暂存到客户端结构体中 int processMultibulkBuffer(client *c) { //定位到行尾 newline = strchr(c->querybuf,'\r'); //解析命令请求参数数目,并存储在客户端对象的c->multibulklen字段 serverAssertWithInfo(c,NULL,c->querybuf[0] == '*'); ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); c->multibulklen = ll; pos = (newline-c->querybuf)+2;//记录已解析命令的请求长度resp的长度 /* Setup argv array on client structure */ //分配请求参数存储空间 c->argv = zmalloc(sizeof(robj*)*c->multibulklen); // 开始循环解析每个请求参数 while(c->multibulklen) { ...... newline = strchr(c->querybuf+pos,'\r'); if (c->querybuf[pos] != '$') { return C_ERR; ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); pos += newline-(c->querybuf+pos)+2; c->bulklen = ll;//字符串参数长度暂存在客户端对象的bulklen字段 //读取该长度的参数内容,并创建字符串对象,同时更新待解析参数multibulklen c->argv[c->argc++] =createStringObject(c->querybuf+pos,c->bulklen); pos += c->bulklen+2; c->multibulklen--; }
2、指令呼叫
當multibulklen的值更新為0時,表示參數解析完成,開始呼叫processCommand來處理指令,處理指令前有許多校驗邏輯,如下:
void processInputBuffer(client *c) { ...... //调用processCommand来处理命令 if (processCommand(c) == C_OK) { ...... } } //处理命令函数 int processCommand(client *c) { //校验是否是quit命令 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } //调用lookupCommand,查看该命令是否存在 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { flagTransaction(c); addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return C_OK; //检查用户权限 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { addReply(c,shared.noautherr); //还有很多检查,不一一列举,比如集群/持久化/复制等 /* 真正执行命令 */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) queueMultiCommand(c); //将结果写入outbuffer addReply(c,shared.queued); } // 调用execCommand执行命令 void execCommand(client *c) { call(c,CMD_CALL_FULL);//调用call执行命令 //调用execCommand调用call执行命令 void call(client *c, int flags) { start = ustime(); c->cmd->proc(c);//执行命令 duration = ustime()-start; //如果是慢查询,记录慢查询 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); //记录到慢日志中 slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); //更新统计信息:当前命令执行时间和调用次数 if (flags & CMD_CALL_STATS) { c->lastcmd->microseconds += duration; c->lastcmd->calls++;
3、回傳結果
Redis回傳結果並不是直接傳回給客戶端,而是先寫入到輸出緩衝區(buf欄位)或輸出鍊錶(reply欄位)
int processCommand(client *c) { ...... //将结果写入outbuffer addReply(c,shared.queued); ...... } //将结果写入outbuffer void addReply(client *c, robj *obj) { //调用listAddNodeHead将客户端添加到服务端结构体的client_pending_write链表,以便后续能快速查找出哪些客户端有数据需要发送 if (prepareClientToWrite(c) != C_OK) return; //然后添加字符串到输出缓冲区 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) //如果添加失败,则添加到输出链表中 _addReplyObjectToList(c,obj); }
addReply函數只是將待傳送給客戶端的資料暫存在輸出鍊錶或輸出緩衝區,那麼什麼時候將這些資料傳送給客戶端呢?答案是開啟事件循環時,呼叫的beforesleep函數,該函數專門執行一些不是很費時的操作,如過期鍵刪除,向客戶端返回命令回復等
void beforeSleep(struct aeEventLoop *eventLoop) { ...... /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); } //回复客户端命令函数 int handleClientsWithPendingWrites(void) { listIter li; listNode *ln; int processed = listLength(server.clients_pending_write); listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(server.clients_pending_write,ln); /* 发送客户端数据 */ if (writeToClient(c->fd,c,0) == C_ERR) continue; /* If there is nothing left, do nothing. Otherwise install * the write handler. */ //如果数据量很大,一次性没有发送完成,则进行添加文件事件,监听当前客户端socket文件描述符的可写事件即可 if (clientHasPendingReplies(c) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } return processed;
以上是Redis指令處理過程實例原始碼分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

熱門話題

Redis集群模式通過分片將Redis實例部署到多個服務器,提高可擴展性和可用性。搭建步驟如下:創建奇數個Redis實例,端口不同;創建3個sentinel實例,監控Redis實例並進行故障轉移;配置sentinel配置文件,添加監控Redis實例信息和故障轉移設置;配置Redis實例配置文件,啟用集群模式並指定集群信息文件路徑;創建nodes.conf文件,包含各Redis實例的信息;啟動集群,執行create命令創建集群並指定副本數量;登錄集群執行CLUSTER INFO命令驗證集群狀態;使

如何清空 Redis 數據:使用 FLUSHALL 命令清除所有鍵值。使用 FLUSHDB 命令清除當前選定數據庫的鍵值。使用 SELECT 切換數據庫,再使用 FLUSHDB 清除多個數據庫。使用 DEL 命令刪除特定鍵。使用 redis-cli 工具清空數據。

使用 Redis 指令需要以下步驟:打開 Redis 客戶端。輸入指令(動詞 鍵 值)。提供所需參數(因指令而異)。按 Enter 執行指令。 Redis 返迴響應,指示操作結果(通常為 OK 或 -ERR)。

Redis 使用單線程架構,以提供高性能、簡單性和一致性。它利用 I/O 多路復用、事件循環、非阻塞 I/O 和共享內存來提高並發性,但同時存在並發性受限、單點故障和不適合寫密集型工作負載的局限性。

理解 Redis 源碼的最佳方法是逐步進行:熟悉 Redis 基礎知識。選擇一個特定的模塊或功能作為起點。從模塊或功能的入口點開始,逐行查看代碼。通過函數調用鏈查看代碼。熟悉 Redis 使用的底層數據結構。識別 Redis 使用的算法。

Redis 使用哈希表存儲數據,支持字符串、列表、哈希表、集合和有序集合等數據結構。 Redis 通過快照 (RDB) 和追加只寫 (AOF) 機制持久化數據。 Redis 使用主從復制來提高數據可用性。 Redis 使用單線程事件循環處理連接和命令,保證數據原子性和一致性。 Redis 為鍵設置過期時間,並使用 lazy 刪除機制刪除過期鍵。

要從 Redis 讀取隊列,需要獲取隊列名稱、使用 LPOP 命令讀取元素,並處理空隊列。具體步驟如下:獲取隊列名稱:以 "queue:" 前綴命名,如 "queue:my-queue"。使用 LPOP 命令:從隊列頭部彈出元素並返回其值,如 LPOP queue:my-queue。處理空隊列:如果隊列為空,LPOP 返回 nil,可先檢查隊列是否存在再讀取元素。

使用Redis進行鎖操作需要通過SETNX命令獲取鎖,然後使用EXPIRE命令設置過期時間。具體步驟為:(1) 使用SETNX命令嘗試設置一個鍵值對;(2) 使用EXPIRE命令為鎖設置過期時間;(3) 當不再需要鎖時,使用DEL命令刪除該鎖。
