redis源代码分析25–VM(下)
这一节介绍下redis中的多线程机制。 先看看多线程换出的机制。 serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThrea
这一节介绍下redis中的多线程机制。
先看看多线程换出的机制。
serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThreaded。
static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { iojob *j; assert(key->storage == REDIS_VM_MEMORY); assert(key->refcount == 1); j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_PREPARE_SWAP; j->db = db; j->key = key; j->val = val; incrRefCount(val); j->canceled = 0; j->thread = (pthread_t) -1; key->storage = REDIS_VM_SWAPPING; lockThreadedIO(); queueIOJob(j); unlockThreadedIO(); return REDIS_OK; }
vmSwapObjectThreaded 会创建一个类型为REDIS_IOJOB_PREPARE_SWAP的job,然后使用queueIOJob来排队。而queueIOJob所做的主要工作就是就是将新job加入到server.io_newjobs,并在创建的线程数还没超过配置值时,创建新的线程。
/* This function must be called while with threaded IO locked */ static void queueIOJob(iojob *j) { redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n", (void*)j, j->type, (char*)j->key->ptr); listAddNodeTail(server.io_newjobs,j); if (server.io_active_threads <p>从spawnIOThread中可以知道,新线程的入口点是IOThreadEntryPoint。</p> <pre class="brush:php;toolbar:false"> static void spawnIOThread(void) { pthread_t thread; sigset_t mask, omask; int err; sigemptyset(&mask); sigaddset(&mask,SIGCHLD); sigaddset(&mask,SIGHUP); sigaddset(&mask,SIGPIPE); pthread_sigmask(SIG_SETMASK, &mask, &omask); while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) { redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s", strerror(err)); usleep(1000000); } pthread_sigmask(SIG_SETMASK, &omask, NULL); server.io_active_threads++; }
IOThreadEntryPoint会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。
static void *IOThreadEntryPoint(void *arg) { iojob *j; listNode *ln; REDIS_NOTUSED(arg); pthread_detach(pthread_self()); while(1) { /* Get a new job to process */ lockThreadedIO(); if (listLength(server.io_newjobs) == 0) { /* No new jobs in queue, exit. */ redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do", (long) pthread_self()); server.io_active_threads--; unlockThreadedIO(); return NULL; } ln = listFirst(server.io_newjobs); j = ln->value; listDelNode(server.io_newjobs,ln); /* Add the job in the processing queue */ j->thread = pthread_self(); listAddNodeTail(server.io_processing,j); ln = listLast(server.io_processing); /* We use ln later to remove it */ unlockThreadedIO(); redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'", (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr); /* Process the Job */ if (j->type == REDIS_IOJOB_LOAD) { j->val = vmReadObjectFromSwap(j->page,j->key->vtype); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { FILE *fp = fopen("/dev/null","w+"); j->pages = rdbSavedObjectPages(j->val,fp); fclose(fp); } else if (j->type == REDIS_IOJOB_DO_SWAP) { if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR) j->canceled = 1; } /* Done: insert the job into the processed queue */ redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)", (long) pthread_self(), (void*)j, (char*)j->key->ptr); lockThreadedIO(); listDelNode(server.io_processing,ln); listAddNodeTail(server.io_processed,j); unlockThreadedIO(); /* Signal the main thread there is new stuff to process */ assert(write(server.io_ready_pipe_write,"x",1) == 1); } return NULL; /* never reached */ } static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[1]; int retval, processed = 0, toprocess = -1, trytoswap = 1; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); if (privdata != NULL) trytoswap = 0; /* check the comments above... */ /* For every byte we read in the read side of the pipe, there is one * I/O job completed to process. */ while((retval = read(fd,buf,1)) == 1) { iojob *j; listNode *ln; robj *key; struct dictEntry *de; redisLog(REDIS_DEBUG,"Processing I/O completed job"); /* Get the processed element (the oldest one) */ lockThreadedIO(); assert(listLength(server.io_processed) != 0); if (toprocess == -1) { toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100; if (toprocess value; listDelNode(server.io_processed,ln); unlockThreadedIO(); /* If this job is marked as canceled, just ignore it */ if (j->canceled) { freeIOJob(j); continue; } /* Post process it in the main thread, as there are things we * can do just here to avoid race conditions and/or invasive locks */ redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount); de = dictFind(j->db->dict,j->key); assert(de != NULL); key = dictGetEntryKey(de); if (j->type == REDIS_IOJOB_LOAD) { redisDb *db; /* Key loaded, bring it at home */ key->storage = REDIS_VM_MEMORY; key->vm.atime = server.unixtime; vmMarkPagesFree(key->vm.page,key->vm.usedpages); redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)", (unsigned char*) key->ptr); server.vm_stats_swapped_objects--; server.vm_stats_swapins++; dictGetEntryVal(de) = j->val; incrRefCount(j->val); db = j->db; freeIOJob(j); /* Handle clients waiting for this key to be loaded. */ handleClientsBlockedOnSwappedKey(db,key); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { /* Now we know the amount of pages required to swap this object. * Let's find some space for it, and queue this task again * rebranded as REDIS_IOJOB_DO_SWAP. */ if (!vmCanSwapOut() || vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR) { /* Ooops... no space or we can't swap as there is * a fork()ed Redis trying to save stuff on disk. */ freeIOJob(j); key->storage = REDIS_VM_MEMORY; /* undo operation */ } else { /* Note that we need to mark this pages as used now, * if the job will be canceled, we'll mark them as freed * again. */ vmMarkPagesUsed(j->page,j->pages); j->type = REDIS_IOJOB_DO_SWAP; lockThreadedIO(); queueIOJob(j); unlockThreadedIO(); } } else if (j->type == REDIS_IOJOB_DO_SWAP) { robj *val; /* Key swapped. We can finally free some memory. */ if (key->storage != REDIS_VM_SWAPPING) { printf("key->storage: %d\n",key->storage); printf("key->name: %s\n",(char*)key->ptr); printf("key->refcount: %d\n",key->refcount); printf("val: %p\n",(void*)j->val); printf("val->type: %d\n",j->val->type); printf("val->ptr: %s\n",(char*)j->val->ptr); } redisAssert(key->storage == REDIS_VM_SWAPPING); val = dictGetEntryVal(de); key->vm.page = j->page; key->vm.usedpages = j->pages; key->storage = REDIS_VM_SWAPPED; key->vtype = j->val->type; decrRefCount(val); /* Deallocate the object from memory. */ dictGetEntryVal(de) = NULL; redisLog(REDIS_DEBUG, "VM: object %s swapped out at %lld (%lld pages) (threaded)", (unsigned char*) key->ptr, (unsigned long long) j->page, (unsigned long long) j->pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; freeIOJob(j); /* Put a few more swap requests in queue if we are still * out of memory */ if (trytoswap && vmCanSwapOut() && zmalloc_used_memory() > server.vm_max_memory) { int more = 1; while(more) { lockThreadedIO(); more = listLength(server.io_newjobs) <p class="copyright"> 原文地址:redis源代码分析25–VM(下), 感谢原作者分享。 </p>

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

Redis集群模式通過分片將Redis實例部署到多個服務器,提高可擴展性和可用性。搭建步驟如下:創建奇數個Redis實例,端口不同;創建3個sentinel實例,監控Redis實例並進行故障轉移;配置sentinel配置文件,添加監控Redis實例信息和故障轉移設置;配置Redis實例配置文件,啟用集群模式並指定集群信息文件路徑;創建nodes.conf文件,包含各Redis實例的信息;啟動集群,執行create命令創建集群並指定副本數量;登錄集群執行CLUSTER INFO命令驗證集群狀態;使

要從 Redis 讀取隊列,需要獲取隊列名稱、使用 LPOP 命令讀取元素,並處理空隊列。具體步驟如下:獲取隊列名稱:以 "queue:" 前綴命名,如 "queue:my-queue"。使用 LPOP 命令:從隊列頭部彈出元素並返回其值,如 LPOP queue:my-queue。處理空隊列:如果隊列為空,LPOP 返回 nil,可先檢查隊列是否存在再讀取元素。

如何清空 Redis 數據:使用 FLUSHALL 命令清除所有鍵值。使用 FLUSHDB 命令清除當前選定數據庫的鍵值。使用 SELECT 切換數據庫,再使用 FLUSHDB 清除多個數據庫。使用 DEL 命令刪除特定鍵。使用 redis-cli 工具清空數據。

在CentOS系統上,您可以通過修改Redis配置文件或使用Redis命令來限制Lua腳本的執行時間,從而防止惡意腳本佔用過多資源。方法一:修改Redis配置文件定位Redis配置文件:Redis配置文件通常位於/etc/redis/redis.conf。編輯配置文件:使用文本編輯器(例如vi或nano)打開配置文件:sudovi/etc/redis/redis.conf設置Lua腳本執行時間限制:在配置文件中添加或修改以下行,設置Lua腳本的最大執行時間(單位:毫秒)

使用 Redis 命令行工具 (redis-cli) 可通過以下步驟管理和操作 Redis:連接到服務器,指定地址和端口。使用命令名稱和參數向服務器發送命令。使用 HELP 命令查看特定命令的幫助信息。使用 QUIT 命令退出命令行工具。

Redis數據過期策略有兩種:定期刪除:定期掃描刪除過期鍵,可通過 expired-time-cap-remove-count、expired-time-cap-remove-delay 參數設置。惰性刪除:僅在讀取或寫入鍵時檢查刪除過期鍵,可通過 lazyfree-lazy-eviction、lazyfree-lazy-expire、lazyfree-lazy-user-del 參數設置。

在Debian系統中,readdir系統調用用於讀取目錄內容。如果其性能表現不佳,可嘗試以下優化策略:精簡目錄文件數量:盡可能將大型目錄拆分成多個小型目錄,降低每次readdir調用處理的項目數量。啟用目錄內容緩存:構建緩存機制,定期或在目錄內容變更時更新緩存,減少對readdir的頻繁調用。內存緩存(如Memcached或Redis)或本地緩存(如文件或數據庫)均可考慮。採用高效數據結構:如果自行實現目錄遍歷,選擇更高效的數據結構(例如哈希表而非線性搜索)存儲和訪問目錄信

Redis計數器是一種使用Redis鍵值對存儲來實現計數操作的機制,包含以下步驟:創建計數器鍵、增加計數、減少計數、重置計數和獲取計數。 Redis計數器的優勢包括速度快、高並發、持久性和簡單易用。它可用於用戶訪問計數、實時指標跟踪、遊戲分數和排名以及訂單處理計數等場景。
