이 글은 Redis 4.0.8 커뮤니티 버전을 기준으로 작성되었습니다
Redis 서버가 받은 명령 요청은 먼저 클라이언트 객체의 querybuf 입력 버퍼에 저장된 후 각 구문을 구문 분석합니다. 명령 요청의 매개변수를 지정하고 이를 클라이언트 개체의 argv 및 argc 필드에 저장합니다.
클라이언트가 명령 요청을 구문 분석하는 입력 함수는 readQueryFromClient입니다. 이 함수는 소켓 데이터를 읽어 클라이언트 개체의 입력 버퍼에 저장하고, processInputBuffer 함수를 호출하여 명령 요청을 구문 분석합니다.
참고: 인라인 명령: 텔넷 세션을 사용하여 명령을 입력합니다
void processInputBuffer(client *c) { ...... //循环遍历输入缓冲区,获取命令参数,调用processMultibulkBuffer解析命令参数和长度 while(sdslen(c->querybuf)) { if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break;//处理telnet方式的内联命令 } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; //解析命令参数和长度暂存到客户端结构体中 } else { serverPanic("Unknown request type"); } } } //解析命令参数和长度暂存到客户端结构体中 int processMultibulkBuffer(client *c) { //定位到行尾 newline = strchr(c->querybuf,'\r'); //解析命令请求参数数目,并存储在客户端对象的c->multibulklen字段 serverAssertWithInfo(c,NULL,c->querybuf[0] == '*'); ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); c->multibulklen = ll; pos = (newline-c->querybuf)+2;//记录已解析命令的请求长度resp的长度 /* Setup argv array on client structure */ //分配请求参数存储空间 c->argv = zmalloc(sizeof(robj*)*c->multibulklen); // 开始循环解析每个请求参数 while(c->multibulklen) { ...... newline = strchr(c->querybuf+pos,'\r'); if (c->querybuf[pos] != '$') { return C_ERR; ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); pos += newline-(c->querybuf+pos)+2; c->bulklen = ll;//字符串参数长度暂存在客户端对象的bulklen字段 //读取该长度的参数内容,并创建字符串对象,同时更新待解析参数multibulklen c->argv[c->argc++] =createStringObject(c->querybuf+pos,c->bulklen); pos += c->bulklen+2; c->multibulklen--; }
multibulklen 값이 0으로 업데이트되면 매개변수 구문 분석이 완료되고 processCommand가 호출되기 시작합니다. 처리하기 전에 많은 명령이 있습니다. 검증 로직은 다음과 같습니다:
void processInputBuffer(client *c) { ...... //调用processCommand来处理命令 if (processCommand(c) == C_OK) { ...... } } //处理命令函数 int processCommand(client *c) { //校验是否是quit命令 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } //调用lookupCommand,查看该命令是否存在 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { flagTransaction(c); addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return C_OK; //检查用户权限 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { addReply(c,shared.noautherr); //还有很多检查,不一一列举,比如集群/持久化/复制等 /* 真正执行命令 */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) queueMultiCommand(c); //将结果写入outbuffer addReply(c,shared.queued); } // 调用execCommand执行命令 void execCommand(client *c) { call(c,CMD_CALL_FULL);//调用call执行命令 //调用execCommand调用call执行命令 void call(client *c, int flags) { start = ustime(); c->cmd->proc(c);//执行命令 duration = ustime()-start; //如果是慢查询,记录慢查询 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); //记录到慢日志中 slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); //更新统计信息:当前命令执行时间和调用次数 if (flags & CMD_CALL_STATS) { c->lastcmd->microseconds += duration; c->lastcmd->calls++;
Redis 반환 결과는 클라이언트에 직접 반환되지 않고 먼저 출력 버퍼(buf 필드)에 기록됩니다. 또는 출력 연결 리스트(응답 필드)
int processCommand(client *c) { ...... //将结果写入outbuffer addReply(c,shared.queued); ...... } //将结果写入outbuffer void addReply(client *c, robj *obj) { //调用listAddNodeHead将客户端添加到服务端结构体的client_pending_write链表,以便后续能快速查找出哪些客户端有数据需要发送 if (prepareClientToWrite(c) != C_OK) return; //然后添加字符串到输出缓冲区 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) //如果添加失败,则添加到输出链表中 _addReplyObjectToList(c,obj); }
addReply 이 함수는 클라이언트로 보낼 데이터를 출력 연결 리스트나 출력 버퍼에 임시로 저장만 합니다. 그러면 이 데이터는 언제 클라이언트로 전송되나요? 대답은 이벤트 루프가 켜질 때 호출되는 beforesleep 함수입니다. 이 함수는 만료된 키 삭제, 클라이언트에 명령 응답 반환 등과 같이 시간이 많이 걸리지 않는 일부 작업을 구체적으로 수행합니다.
위 내용은 Redis 명령 처리 예시 소스코드 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!