本篇文章為大家帶來了關於Redis的相關知識,其中主要介紹了關於原子操作中命令原子性的相關問題,包括了處理並發的方案、程式設計模型、多IO線程以及單一命令的相關內容,下面一起看一下,希望對大家有幫助。
推薦學習:Redis影片教學
業務中有時候我們會用Redis 處理一些高並發的業務場景,例如,秒殺業務,對於庫存的操作。 。 。
先來分析下,並發場景下會發生什麼問題
並發問題主要發生在資料的修改上,對於客戶端修改數據,一般分成下面兩個步驟:
1、客戶端先把資料讀取到本地,在本地進行修改;
2、客戶端修改完資料後,再寫回Redis。
我們把這個流程叫做讀取-修改-寫回
操作(Read-Modify-Write
,簡稱 RMW 操作)。如果客戶端並發進行 RMW 操作的時候,就需要保證 讀取-修改-寫回
是一個原子操作,進行指令操作的時候,其他客戶端不能對目前的資料進行操作。
錯誤的栗子:
統計一個頁面的訪問次數,每次刷新頁面訪問次數 1,這裡使用 Redis 來記錄訪問次數。
如果每次的讀取-修改-寫回
操作不是一個原子操作,那麼就可能存在下圖的問題,客戶端2在客戶端1操作的中途,也取得Redis 的值,也對值進行1,操作,這樣就導致最終資料的錯誤。
對於上面的這種情況,一般會有兩種方式解決:
1、使用Redis 實作一把分散式鎖,透過鎖定來保護每次只有一個執行緒來操作臨界資源;
2、實作操作指令的原子性。
讀取-修改-寫回
是一個原子性的命令,那麼這個命令在操作過程中就不有別的執行緒同時讀取操作數據,這樣就能避免上面栗子出現的問題。 下面從原子性和鎖定兩個方面,具體分析下,對並發存取問題的處理
非阻塞的read 請求在資料未準備好的情況下立即返回,可以繼續往下執行,此時應用程式不斷輪詢內核,直到資料準備好,內核將資料拷貝到應用程式緩衝區,read 呼叫才可以取得結果。
這裡最後一次 read 調用,取得資料的過程,是一個同步的過程,是需要等待的過程。這裡的同步指的是核心態的資料拷貝到使用者程式的快取區這個過程。
發起非同步I/O,就立即返回,核心自動將資料從核心空間拷貝到使用者空間,這個拷貝過程同樣是異步的,核心自動完成的,和前面的同步操作不一樣,應用程式並不需要主動發起拷貝動作。
舉個你去飯堂吃飯的例子,你好比應用程序,飯堂好比操作系統。
阻塞I/O 好比,你去飯堂吃飯,但是飯堂的菜還沒做好,然後你就一直在那裡等啊等,等了好長一段時間終於等到飯堂阿姨把菜端了出來(資料準備的過程),但你還得繼續等阿姨把菜(內核空間)打到你的便當(使用者空間),經歷完這兩個過程,你才可以離開。
非阻塞I/O 好比,你去了飯堂,問阿姨菜做好了沒有,阿姨告訴你沒,你就離開了,過幾十分鐘,你又來飯堂問阿姨,阿姨說做好了,於是阿姨幫你把菜打到你的便當盒裡,這個過程你是得等待的。
非同步 I/O 好比,你讓飯堂阿姨將菜做好並把菜打到飯盒裡後,把飯盒送到你面前,整個過程你都不需要任何等待。
在web 服務中,處理web 請求通常有兩個體系結構,分別為:thread-based architecture
(基於執行緒的架構)、event-driven architecture
(事件驅動模型)
thread-based architecture(基於執行緒的架構):這種比較容易理解,就是多執行緒並發模式,服務端在處理請求的時候,一個請求分配一個獨立的執行緒來處理。
因為每個請求分配一個獨立的線程,所以單一線程的阻塞不會影響到其他的線程,能夠提高程式的回應速度。
不足的是,連線和執行緒之間始終保持一對一的關係,如果是一直處於Keep-Alive 狀態的長連線將會導致大量工作執行緒在空閒狀態下等待,例如,檔案系統訪問,網路等。此外,成百上千的連接也可能導致並發線程浪費大量記憶體的堆疊空間。
#事件驅動的體系結構由事件生產者和事件消費者群組,是一種鬆散耦合、分散式的驅動架構,生產者收集到某應用產生的事件後即時對事件採取必要的處理後路由至下游系統,無需等待系統回應,下游的事件消費者群組收到是事件訊息,非同步的處理。
事件驅動架構有以下優勢:
降低事件生產者和訂閱者的耦合性。事件生產者只需關注事件的發生,無需關注事件如何處理以及被分發給哪些訂閱者。任何一個環節故障,不會影響其他業務正常運作。
事件驅動架構適用於非同步場景,即使是需求高峰期,收集各種來源的事件後保留在事件匯流排中,然後逐步分發傳遞事件,不會造成系統擁塞或資源過剩的情況。
事件驅動架構中路由與篩選能力支援分割服務,以便於擴充和路由分發。
Reactor 模式與Proactor 模式都是 event-driven architecture
(事件驅動模型)的實作方式,這裡具體分析下
Reactor 模式,是指透過一個或多個輸入同時傳遞給服務處理器的服務請求的事件驅動處理模式。
在處理⽹絡 IO 的連接事件、讀取事件、寫入事件。 Reactor 中引入了三類角色
Reactor 模型又分為 3 個類別:
#建立連線(Acceptor)、監聽accept、read、write事件(Reactor)、處理事件(Handler )都只用一個單線程;
與單線程模式不同的是,添加了一個工作者執行緒池,並將非 I/O
作業從Reactor 執行緒移出轉交給工作者執行緒池(Thread Pool)來執行。
建立連線(Acceptor)和 監聽accept、read、write事件(Reactor),重複使用一個執行緒。
工作執行緒池:處理事件(Handler),由一個工作執行緒池來執行業務邏輯,包括資料就緒後,使用者狀態的資料讀寫。
#對於多個CPU的機器,為充分利用系統資源,將Reactor 拆分為兩部分:mainReactor 和subReactor。
mainReactor:負責監聽server socket
,用來處理網路新連線的建立,將建立的socketChannel指定註冊給subReactor,通常一個執行緒就可以處理;
subReactor:監聽accept、read、write
事件(Reactor
),包含等待資料就緒時,核心態的資料讀寫,通常使用多執行緒。
工作線程:處理事件(Handler)可以和 subReactor 共同使用同一個線程,也可以做成線程池,類似上面多線程 Reactor 模式下的工作線程池的處理方式。
reactor 流程與Reactor 模式類似
不同點就是
在每次感知到有事件發生(例如可讀就緒事件)後,就需要應用程式主動呼叫read 方法來完成資料的讀取,也就是要應用程式主動將socket 接收快取中的資料讀到應用程式記憶體中,這個過程是同步的,讀取完資料後應用進程才能處理資料。
在發起非同步讀寫請求時,需要傳入資料緩衝區的位址(用來存放結果資料)等訊息,這樣系統核心才可以自動幫我們把資料的讀寫工作完成,這裡的讀寫工作全程由作業系統來做,並不需要像Reactor 那樣還需要應用進程主動發起 read/write
來讀寫數據,作業系統完成讀寫工作後,就會通知應用程式直接處理資料。
因此,Reactor 可以理解為「來了事件作業系統通知應用程式,讓應用程式來處理」,而Proactor 可以理解為「來了事件作業系統來處理,處理完再通知應用程式」 。
舉個實際生活中的例子,Reactor 模式就是快遞員在樓下,打電話給你告訴你快遞到你家小區了,你需要自己下樓來拿快遞。而在 Proactor 模式下,快遞員直接將快遞送到你家門口,然後通知你。
Redis 中使用是單線程,可能處於以下幾方面的考慮
1、Redis 是純記憶體的操作,執行速度是非常快的,因此這部分操作通常不會是效能瓶頸,效能瓶頸在於網路I/O;
2、避免過多的上下文切換開銷,單執行緒則可以規避進程內頻繁的線程切換開銷;
3、避免同步機制的開銷,多線程必然會面臨對於共享資源的訪問,這時候通常的做法就是加鎖,雖然是多線程,這時候就會變成串行的存取。也就是多執行緒程式設計模式會面臨的共享資源的並發存取控制問題;
4、簡單可維護,多執行緒也會引入同步原語來保護共享資源的並發訪問,程式碼的可維護性和易讀性將會下降。
Redis 在v6.0 版本之前,Redis 的核心網路模型一直是典型的單Reactor 模型:利用 epoll/select/kqueue
等多路復用技術,在單執行緒的事件循環中不斷去處理事件(客戶端請求),最後回寫回應資料到客戶端:
這裡來看下Redis 如何使用單執行緒處理任務
Redis 的網路框架實作了Reactor 模型,並且自行開發實作了一個事件驅動框架。
事件驅動框架的邏輯簡單點講就是
來看下Redis 中事件驅動框架實作的幾個主要函數
// 执行事件捕获,分发和处理循环 void aeMain(aeEventLoop *eventLoop); // 用来注册监听的事件和事件对应的处理函数。只有对事件和处理函数进行了注册,才能在事件发生时调用相应的函数进行处理。 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); // aeProcessEvents 函数实现的主要功能,包括捕获事件、判断事件类型和调用具体的事件处理函数,从而实现事件的处理 int aeProcessEvents(aeEventLoop *eventLoop, int flags);
使用aeMain 作為主迴圈來持續監聽事件和捕獲,其中會呼叫aeProcessEvents 函數,實現事件捕獲、判斷事件類型和呼叫特定的事件處理函數,從而實現事件的處理。
// https://github.com/redis/redis/blob/5.0/src/ae.c#L496 void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } } // https://github.com/redis/redis/blob/5.0/src/ae.c#L358 int aeProcessEvents(aeEventLoop *eventLoop, int flags) { ... if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { ... //调用aeApiPoll函数捕获事件 numevents = aeApiPoll(eventLoop, tvp); ... } ... }
可以看到 aeProcessEvents 中對於 IO 事件的捕獲是透過呼叫 aeApiPoll 來完成的。
aeApiPoll 是I/O 多路復用API,是基於 epoll_wait/select/kevent
等系統呼叫的封裝,監聽等待讀取和寫入事件觸發,然後處理,它是事件循環(Event Loop)中的核心函數,是事件驅動得以運作的基礎。
Redis 是依賴作業系統底層提供的 IO 多路復用機制,來實現事件捕獲,檢查是否有新的連接、讀寫事件發生。為了適配不同的作業系統,Redis 對不同作業系統實現的網路 IO 多路復用函數,都進行了統一的封裝。
// https://github.com/redis/redis/blob/5.0/src/ae.c#L49 #ifdef HAVE_EVPORT #include "ae_evport.c" // Solaris #else #ifdef HAVE_EPOLL #include "ae_epoll.c" // Linux #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" // MacOS #else #include "ae_select.c" // Windows #endif #endif #endif
ae_epoll.c:對應Linux 上的IO 複用函數epoll;
ae_evport.c:對應Solaris 上的IO 復用函數evport;
ae_kqueue.c :對應macOS 或FreeBSD 上的IO 復用函數kqueue;
ae_select.c:對應Linux(或Windows)的IO 復用函數select。
監聽socket 的讀取事件,當有客戶端連線請求過來,使用函數acceptTcpHandler 和客戶端建立連線
#當Redis 啟動後,伺服器程式的main 函數會呼叫initSever 函數來進行初始化,而在初始化的過程中,aeCreateFileEvent 就會被initServer 函數調用,用於註冊要監聽的事件,以及對應的事件處理函數。
// https://github.com/redis/redis/blob/5.0/src/server.c#L2036 void initServer(void) { ... // 创建一个事件处理程序以接受 TCP 和 Unix 中的新连接 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } ... }
可以看到initServer 中會根據啟用的IP 連接埠個數,為每個IP 連接埠上的網路事件,呼叫aeCreateFileEvent,建立對AE_READABLE 事件的監聽,並且註冊AE_READABLE 事件的處理handler,也就是acceptTcpHandler 函式。
然後看下acceptTcpHandler 的實作
// https://github.com/redis/redis/blob/5.0/src/networking.c#L734 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) { // 用于accept客户端的连接,其返回值是客户端对应的socket cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); // 会调用acceptCommonHandler对连接以及客户端进行初始化 acceptCommonHandler(cfd,0,cip); } } // https://github.com/redis/redis/blob/5.0/src/networking.c#L664 static void acceptCommonHandler(int fd, int flags, char *ip) { client *c; // 分配并初始化新客户端 if ((c = createClient(fd)) == NULL) { serverLog(LL_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); close(fd); /* May be already closed, just ignore errors */ return; } // 判断当前连接的客户端是否超过最大值,如果超过的话,会拒绝这次连接。否则,更新客户端连接数的计数 if (listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; /* That's a best effort error message, don't check write errors */ if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; freeClient(c); return; } ... } // 使用多路复用,需要记录每个客户端的状态,client 之前通过链表保存 typedef struct client { int fd; // 字段是客户端套接字文件描述符 sds querybuf; // 保存客户端发来命令请求的输入缓冲区。以Redis通信协议的方式保存 int argc; // 当前命令的参数数量 robj **argv; // 当前命令的参数 redisDb *db; // 当前选择的数据库指针 int flags; list *reply; // 保存命令回复的链表。因为静态缓冲区大小固定,主要保存固定长度的命令回复,当处理一些返回大量回复的命令,则会将命令回复以链表的形式连接起来。 // ... many other fields ... char buf[PROTO_REPLY_CHUNK_BYTES]; } client; client *createClient(int fd) { client *c = zmalloc(sizeof(client)); // 如果fd为-1,表示创建的是一个无网络连接的伪客户端,用于执行lua脚本的时候。 // 如果fd不等于-1,表示创建一个有网络连接的客户端 if (fd != -1) { // 设置fd为非阻塞模式 anetNonBlock(NULL,fd); // 禁止使用 Nagle 算法,client向内核递交的每个数据包都会立即发送给server出去,TCP_NODELAY anetEnableTcpNoDelay(NULL,fd); // 如果开启了tcpkeepalive,则设置 SO_KEEPALIVE if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 创建一个文件事件状态el,且监听读事件,开始接受命令的输入 if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } ... // 初始化client 中的参数 return c; }
1、acceptTcpHandler 主要用於處理和客戶端連接的建立;
2、其中會呼叫函數anetTcpAccept 用於accept 客戶端的連接,其回傳值是客戶端對應的socket;
3、然後呼叫acceptCommonHandler 對連接以及客戶端進行初始化;
4、初始化客戶端的時候,同時使用aeCreateFileEvent 用來註冊監聽的事件和事件對應的處理函數,將readQueryFromClient 命令讀取處理器綁定到新連接對應的文件描述符上;
5、伺服器會監聽該文件描述符的讀取事件,當客戶端發送了命令,觸發了AE_READABLE 事件,那麼就會呼叫回呼函數readQueryFromClient() 來從檔案描述子fd 中讀發來的命令,並保存在輸入緩衝區中querybuf。
readQueryFromClient 是請求處理的起點,解析並執行客戶端的請求指令。
// https://github.com/redis/redis/blob/5.0/src/networking.c#L1522 // 读取client的输入缓冲区的内容 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; int nread, readlen; size_t qblen; UNUSED(el); UNUSED(mask); ... // 输入缓冲区的长度 qblen = sdslen(c->querybuf); // 更新缓冲区的峰值 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; // 扩展缓冲区的大小 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 调用read从描述符为fd的客户端socket中读取数据 nread = read(fd, c->querybuf+qblen, readlen); ... // 处理读取的内容 processInputBufferAndReplicate(c); } // https://github.com/redis/redis/blob/5.0/src/networking.c#L1507 void processInputBufferAndReplicate(client *c) { // 当前客户端不属于主从复制中的Master // 直接调用 processInputBuffer,对客户端输入缓冲区中的命令和参数进行解析 if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c); // 客户端属于主从复制中的Master // 调用processInputBuffer函数,解析客户端命令, // 调用replicationFeedSlavesFromMasterStream 函数,将主节点接收到的命令同步给从节点 } else { size_t prev_offset = c->reploff; processInputBuffer(c); size_t applied = c->reploff - prev_offset; if (applied) { replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied); sdsrange(c->pending_querybuf,applied,-1); } } } // https://github.com/redis/redis/blob/5.0/src/networking.c#L1428 void processInputBuffer(client *c) { server.current_client = c; /* Keep processing while there is something in the input buffer */ // 持续读取缓冲区的内容 while(c->qb_pos < sdslen(c->querybuf)) { ... /* Multibulk processing could see a <= 0 length. */ // 如果参数为0,则重置client if (c->argc == 0) { resetClient(c); } else { /* Only reset the client when the command was executed. */ // 执行命令成功后重置client if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; } // 命令处于阻塞状态中的客户端,不需要进行重置 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) resetClient(c); } /* freeMemoryIfNeeded may flush slave output buffers. This may * result into a slave, that may be the active client, to be * freed. */ if (server.current_client == NULL) break; } } /* Trim to pos */ if (server.current_client != NULL && c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } server.current_client = NULL; }
1、readQueryFromClient(),從檔案描述子fd 中讀出資料到輸入緩衝區querybuf 中;
2、使用processInputBuffer 函式完成對指令的解析,在其中使用processInlineBuffer或processMultibulkBuffer 根據Redis 協定解析指令;
3、完成對一個指令的解析,就使用processCommand 對指令就行執行;
4、指令執行完成,最後呼叫addReply 函數族的一系列函數將回應資料寫入對應client 的寫出緩衝區:client->buf 或client->reply ,client->buf 是首選的寫出緩衝區,固定大小16KB,一般來說可以緩衝足夠多的回應數據,但是如果客戶端在時間視窗內需要回應的資料非常大,那麼則會自動切換到client->reply 鍊錶上去,使用鍊錶理論上能夠保存無限大的資料(受限於機器的實體記憶體),最後把client 加進一個LIFO 佇列clients_pending_write;
Redis sever 客戶端緩衝區中的資料寫回客戶端。
// https://github.com/redis/redis/blob/5.0/src/server.c#L1380 void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); ... // 将 Redis sever 客户端缓冲区中的数据写回客户端 handleClientsWithPendingWrites(); ... } // https://github.com/redis/redis/blob/5.0/src/networking.c#L1082 int handleClientsWithPendingWrites(void) { listIter li; listNode *ln; // 遍历 clients_pending_write 队列,调用 writeToClient 把 client 的写出缓冲区里的数据回写到客户端 int processed = listLength(server.clients_pending_write); listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(server.clients_pending_write,ln); ... // 调用 writeToClient 函数,将客户端输出缓冲区中的数据写回 if (writeToClient(c->fd,c,0) == C_ERR) continue; // 如果输出缓冲区的数据还没有写完,此时,handleClientsWithPendingWrites 函数就 // 会调用 aeCreateFileEvent 函数,创建可写事件,并设置回调函数 sendReplyToClien if (clientHasPendingReplies(c)) { int ae_flags = AE_WRITABLE; if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) { ae_flags |= AE_BARRIER; } // 将文件描述符fd和AE_WRITABLE事件关联起来,当客户端可写时,就会触发事件,调用sendReplyToClient()函数,执行写事件 if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } } return processed; } // https://github.com/redis/redis/blob/5.0/src/networking.c#L1072 // 写事件处理程序,只是发送回复给client void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(el); UNUSED(mask); writeToClient(fd,privdata,1); } // https://github.com/redis/redis/blob/5.0/src/networking.c#L979 // 将输出缓冲区的数据写给client,如果client被释放则返回C_ERR,没被释放则返回C_OK int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; size_t objlen; clientReplyBlock *o; // 如果指定的client的回复缓冲区中还有数据,则返回真,表示可以写socket while(clientHasPendingReplies(c)) { // 固定缓冲区发送未完成 if (c->bufpos > 0) { // 将缓冲区的数据写到fd中 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); ... // 如果发送的数据等于buf的偏移量,表示发送完成 if ((int)c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } // 固定缓冲区发送完成,发送回复链表的内容 } else { // 回复链表的第一条回复对象,和对象值的长度和所占的内存 o = listNodeValue(listFirst(c->reply)); objlen = o->used; if (objlen == 0) { c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); continue; } // 将当前节点的值写到fd中 nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; ... } ... } ... // 如果指定的client的回复缓冲区中已经没有数据,发送完成 if (!clientHasPendingReplies(c)) { c->sentlen = 0; // 删除当前client的可读事件的监听 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ // 如果指定了写入按成之后立即关闭的标志,则释放client if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { freeClient(c); return C_ERR; } } return C_OK; }
2、如果输出缓冲区的数据还没有写完,此时,handleClientsWithPendingWrites 函数就会调用 aeCreateFileEvent 函数,注册 sendReplyToClient 到该连接的写就绪事件,等待将后续将数据写回给客户端。
上面的执行流程总结下来就是
1、Redis Server
启动后,主线程会启动一个时间循环(Event Loop),持续监听事件;
2、client 到 server 的新连接,会调用 acceptTcpHandler 函数,之后会注册读事件 readQueryFromClient 函数,client 发给 server 的数据,都会在这个函数处理,这个函数会解析 client 的数据,找到对应的 cmd 函数执行;
3、cmd 逻辑执行完成后,server 需要写回数据给 client,调用 addReply 函数族的一系列函数将响应数据写入到对应 client 的写出缓冲区:client->buf
或者 client->reply
,client->buf
是首选的写出缓冲区,固定大小 16KB,一般来说可以缓冲足够多的响应数据,但是如果客户端在时间窗口内需要响应的数据非常大,那么则会自动切换到 client->reply
链表上去,使用链表理论上能够保存无限大的数据(受限于机器的物理内存),最后把 client 添加进一个 LIFO 队列 clients_pending_write
;
4、在 Redis 事件驱动框架每次循环进入事件处理函数前,来处理监听到的已触发事件或是到时的时间事件之前,都会调用 beforeSleep 函数,进行一些任务处理,这其中就包括了调用 handleClientsWithPendingWrites 函数,它会将 Redis sever 客户端缓冲区中的数据写回客户端;
在 Redis6.0 的版本中,引入了多线程来处理 IO 任务,多线程的引入,充分利用了当前服务器多核特性,使用多核运行多线程,让多线程帮助加速数据读取、命令解析以及数据写回的速度,提升 Redis 整体性能。
Redis6.0 之前的版本用的是单线程 Reactor 模式,所有的操作都在一个线程中完成,6.0 之后的版本使用了主从 Reactor 模式。
由一个 mainReactor 线程接收连接,然后发送给多个 subReactor 线程处理,subReactor 负责处理具体的业务。
来看下 Redis 多IO线程的具体实现过程
使用 initThreadedIO 函数来初始化多 IO 线程。
// https://github.com/redis/redis/blob/6.2/src/networking.c#L3573 void initThreadedIO(void) { server.io_threads_active = 0; /* We start with threads not active. */ /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ // 如果用户只配置了一个 I/O 线程,不需要创建新线程了,直接在主线程中处理 if (server.io_threads_num == 1) return; if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); } /* Spawn and initialize the I/O threads. */ // 初始化线程 for (int i = 0; i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); // 编号为0是主线程 if (i == 0) continue; /* Thread 0 is the main thread. */ /* Things we do only for the additional threads. */ pthread_t tid; // 初始化io_threads_mutex数组 pthread_mutex_init(&io_threads_mutex[i],NULL); // 初始化io_threads_pending数组 setIOPendingCount(i, 0); // 主线程在启动 I/O 线程的时候会默认先锁住它,直到有 I/O 任务才唤醒它。 pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ // 调用pthread_create函数创建IO线程,线程运行函数为IOThreadMain if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; } }
可以看到在 initThreadedIO 中完成了对下面四个数组的初始化工作
io_threads_list 数组:保存了每个 IO 线程要处理的客户端,将数组每个元素初始化为一个 List 类型的列表;
io_threads_pending 数组:保存等待每个 IO 线程处理的客户端个数;
io_threads_mutex 数组:保存线程互斥锁;
io_threads 数组:保存每个 IO 线程的描述符。
Redis server
在和一个客户端建立连接后,就开始了监听客户端的可读事件,处理可读事件的回调函数就是 readQueryFromClient
// https://github.com/redis/redis/blob/6.2/src/networking.c#L2219 void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); int nread, readlen; size_t qblen; /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ // 判断是否从客户端延迟读取数据 if (postponeClientRead(c)) return; ... } // https://github.com/redis/redis/blob/6.2/src/networking.c#L3746 int postponeClientRead(client *c) { // 当多线程 I/O 模式开启、主线程没有在处理阻塞任务时,将 client 加入异步队列。 if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) { // 给客户端的flag添加CLIENT_PENDING_READ标记,表示推迟该客户端的读操作 c->flags |= CLIENT_PENDING_READ; // 将可获得加入clients_pending_write列表 listAddNodeHead(server.clients_pending_read,c); return 1; } else { return 0; } }
使用 clients_pending_read 保存了需要进行延迟读操作的客户端之后,这些客户端又是如何分配给多 IO 线程执行的呢?
handleClientsWithPendingWritesUsingThreads 函数:该函数主要负责将 clients_pending_write 列表中的客户端分配给 IO 线程进行处理。
看下如何实现
// https://github.com/redis/redis/blob/6.2/src/networking.c#L3766 int handleClientsWithPendingReadsUsingThreads(void) { // 当多线程 I/O 模式开启,才能执行下面的流程 if (!server.io_threads_active || !server.io_threads_do_reads) return 0; int processed = listLength(server.clients_pending_read); if (processed == 0) return 0; // 遍历待读取的 client 队列 clients_pending_read, // 根据IO线程的数量,让clients_pending_read中客户端数量对IO线程进行取模运算 // 取模的结果就是客户端分配给对应IO线程的编号 listIter li; listNode *ln; listRewind(server.clients_pending_read,&li); int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } // 设置当前 I/O 操作为读取操作,给每个 I/O 线程的计数器设置分配的任务数量, // 让 I/O 线程可以开始工作:只读取和解析命令,不执行 io_threads_op = IO_THREADS_OP_READ; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); setIOPendingCount(j, count); } // 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。 listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); readQueryFromClient(c->conn); } listEmpty(io_threads_list[0]); // 忙轮询,等待所有 IO 线程完成待读客户端的处理 while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += getIOPendingCount(j); if (pending == 0) break; } // 遍历待读取的 client 队列,清除 CLIENT_PENDING_READ标记, // 然后解析并执行所有 client 的命令。 while(listLength(server.clients_pending_read)) { ln = listFirst(server.clients_pending_read); client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_READ; listDelNode(server.clients_pending_read,ln); serverAssert(!(c->flags & CLIENT_BLOCKED)); // client 的第一条命令已经被解析好了,直接尝试执行。 if (processPendingCommandsAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid * processing the client later. So we just go * to the next. */ continue; } // 解析并执行 client 命令 processInputBuffer(c); // 命令执行完成之后,如果 client 中有响应数据需要回写到客户端,则将 client 加入到待写出队列 clients_pending_write if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) clientInstallWriteHandler(c); } /* Update processed count on server */ server.stat_io_reads_processed += processed; return processed; }
1、当客户端发送命令请求之后,会触发 Redis 主线程的事件循环,命令处理器 readQueryFromClient 被回调,多线程模式下,则会把 client 加入到 clients_pending_read 任务队列中去,后面主线程再分配到 I/O 线程去读取客户端请求命令;
2、主线程会根据 clients_pending_read 中客户端数量对IO线程进行取模运算,取模的结果就是客户端分配给对应IO线程的编号;
3、忙轮询,等待所有的线程完成读取客户端命令的操作,这一步用到了多线程的请求;
4、遍历 clients_pending_read,执行所有 client 的命令,这里就是在主线程中执行的,命令的执行是单线程的操作。
完成命令的读取、解析以及执行之后,客户端命令的响应数据已经存入 client->buf 或者 client->reply 中。
主循环在捕获 IO 事件的时候,beforeSleep 函数会被调用,进而调用 handleClientsWithPendingWritesUsingThreads ,写回响应数据给客户端。
// https://github.com/redis/redis/blob/6.2/src/networking.c#L3662 int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return 0; /* Return ASAP if there are no clients. */ // 如果用户设置的 I/O 线程数等于 1 或者当前 clients_pending_write 队列中待写出的 client // 数量不足 I/O 线程数的两倍,则不用多线程的逻辑,让所有 I/O 线程进入休眠, // 直接在主线程把所有 client 的相应数据回写到客户端。 if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { return handleClientsWithPendingWrites(); } // 唤醒正在休眠的 I/O 线程(如果有的话)。 if (!server.io_threads_active) startThreadedIO(); /* Distribute the clients across N different lists. */ // 和上面的handleClientsWithPendingReadsUsingThreads中的操作一样分配客户端给IO线程 listIter li; listNode *ln; listRewind(server.clients_pending_write,&li); int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; /* Remove clients from the list of pending writes since * they are going to be closed ASAP. */ if (c->flags & CLIENT_CLOSE_ASAP) { listDelNode(server.clients_pending_write, ln); continue; } int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } // 设置当前 I/O 操作为写出操作,给每个 I/O 线程的计数器设置分配的任务数量, // 让 I/O 线程可以开始工作,把写出缓冲区(client->buf 或 c->reply)中的响应数据回写到客户端。 // 可以看到写回操作也是多线程执行的 io_threads_op = IO_THREADS_OP_WRITE; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); setIOPendingCount(j, count); } // 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。 listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); writeToClient(c,0); } listEmpty(io_threads_list[0]); /* Wait for all the other threads to end their work. */ // 等待所有的线程完成对应的工作 while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += getIOPendingCount(j); if (pending == 0) break; } // 最后再遍历一次 clients_pending_write 队列,检查是否还有 client 的写出缓冲区中有残留数据, // 如果有,那就为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。 listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); // 检查 client 的写出缓冲区是否还有遗留数据。 if (clientHasPendingReplies(c) && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) { freeClientAsync(c); } } listEmpty(server.clients_pending_write); /* Update processed count on server */ server.stat_io_writes_processed += processed; return processed; }
1、也是会将 client 分配给所有的 IO 线程;
2、忙轮询,等待所有的线程将缓存中的数据写回给客户端,这里写回操作使用的多线程;
3、最后再遍历 clients_pending_write,为那些还残留有响应数据的 client 注册命令回复处理器 sendReplyToClient,等待客户端可写之后在事件循环中继续回写残余的响应数据。
通过上面的分析可以得出结论,Redis 多IO线程中多线程的应用
1、解析客户端的命令的时候用到了多线程,但是对于客户端命令的执行,使用的还是单线程;
2、给客户端回复数据的时候,使用到了多线程。
来总结下 Redis 中多线程的执行过程
1、Redis Server 启动后,主线程会启动一个时间循环(Event Loop),持续监听事件;
2、client 到 server 的新连接,会调用 acceptTcpHandler 函数,之后会注册读事件 readQueryFromClient 函数,client 发给 server 的数据,都会在这个函数处理;
3、客户端发送给服务端的数据,不会类似 6.0 之前的版本使用 socket 直接去读,而是会将 client 放入到 clients_pending_read 中,里面保存了需要进行延迟读操作的客户端;
4、处理 clients_pending_read 的函数 handleClientsWithPendingReadsUsingThreads,在每次事件循环的时候都会调用;
5、命令执行完成以后,回复的内容还是会被写入到 client 的缓存区中,这些 client 和6.0之前的版本处理方式一样,也是会被放入到 clients_pending_write(待写回数据的客户端);
6、6.0 对于clients_pending_write 的处理使用到了多线程;
通过上面的分析,我们知道,Redis 的主线程是单线程执行的,所有 Redis 中的单命令,都是原子性的。
所以对于一些场景的操作尽量去使用 Redis 中单命令去完成,就能保证命令执行的原子性。
比如对于上面的读取-修改-写回
操作可以使用 Redis 中的原子计数器, INCRBY(自增)、DECRBR(自减)、INCR(加1) 和 DECR(减1) 等命令。
这些命令可以直接帮助我们处理并发控制
127.0.0.1:6379> incr test-1 (integer) 1 127.0.0.1:6379> incr test-1 (integer) 2 127.0.0.1:6379> incr test-1 (integer) 3
分析下源码,看看这个命令是如何实现的
// https://github.com/redis/redis/blob/6.2/src/t_string.c#L617 void incrCommand(client *c) { incrDecrCommand(c,1); } void decrCommand(client *c) { incrDecrCommand(c,-1); } void incrbyCommand(client *c) { long long incr; if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != C_OK) return; incrDecrCommand(c,incr); } void decrbyCommand(client *c) { long long incr; if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != C_OK) return; incrDecrCommand(c,-incr); }
可以看到 INCRBY(自增)、DECRBR(自减)、INCR(加1) 和 DECR(减1)这几个命令最终都是调用的 incrDecrCommand
// https://github.com/redis/redis/blob/6.2/src/t_string.c#L579 void incrDecrCommand(client *c, long long incr) { long long value, oldvalue; robj *o, *new; // 查找有没有对应的键值 o = lookupKeyWrite(c->db,c->argv[1]); // 判断类型,如果value对象不是字符串类型,直接返回 if (checkType(c,o,OBJ_STRING)) return; // 将字符串类型的value转换为longlong类型保存在value中 if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return; // 备份旧的value oldvalue = value; // 判断 incr 的值是否超过longlong类型所能表示的范围 // 长度的范围,十进制 64 位有符号整数 if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) || (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) { addReplyError(c,"increment or decrement would overflow"); return; } // 计算新的 value值 value += incr; if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT && (value < 0 || value >= OBJ_SHARED_INTEGERS) && value >= LONG_MIN && value <= LONG_MAX) { new = o; o->ptr = (void*)((long)value); } else { new = createStringObjectFromLongLongForValue(value); // 如果之前的 value 对象存在 if (o) { // 重写为 new 的值 dbOverwrite(c->db,c->argv[1],new); } else { // 如果之前没有对应的 value,新设置 value 的值 dbAdd(c->db,c->argv[1],new); } } // 进行通知 signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id); server.dirty++; addReply(c,shared.colon); addReply(c,new); addReply(c,shared.crlf); }
1、Redis 中的命令执行都是单线程的,所以单命令的执行都是原子性的;
2、虽然 Redis6.0 版本引入了多线程,但是仅是在接收客户端的命令和回复客户端的数据用到了多线程,实际命令的执行还是单线程在处理;
推荐学习:Redis视频教程
以上是詳細解析Redis中命令的原子性的詳細內容。更多資訊請關注PHP中文網其他相關文章!