redis源代码分析25–VM(下)
这一节介绍下redis中的多线程机制。 先看看多线程换出的机制。 serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThrea
这一节介绍下redis中的多线程机制。
先看看多线程换出的机制。
serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThreaded。
static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { iojob *j; assert(key->storage == REDIS_VM_MEMORY); assert(key->refcount == 1); j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_PREPARE_SWAP; j->db = db; j->key = key; j->val = val; incrRefCount(val); j->canceled = 0; j->thread = (pthread_t) -1; key->storage = REDIS_VM_SWAPPING; lockThreadedIO(); queueIOJob(j); unlockThreadedIO(); return REDIS_OK; }
vmSwapObjectThreaded 会创建一个类型为REDIS_IOJOB_PREPARE_SWAP的job,然后使用queueIOJob来排队。而queueIOJob所做的主要工作就是就是将新job加入到server.io_newjobs,并在创建的线程数还没超过配置值时,创建新的线程。
/* This function must be called while with threaded IO locked */ static void queueIOJob(iojob *j) { redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n", (void*)j, j->type, (char*)j->key->ptr); listAddNodeTail(server.io_newjobs,j); if (server.io_active_threads <p>从spawnIOThread中可以知道,新线程的入口点是IOThreadEntryPoint。</p> <pre class="brush:php;toolbar:false"> static void spawnIOThread(void) { pthread_t thread; sigset_t mask, omask; int err; sigemptyset(&mask); sigaddset(&mask,SIGCHLD); sigaddset(&mask,SIGHUP); sigaddset(&mask,SIGPIPE); pthread_sigmask(SIG_SETMASK, &mask, &omask); while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) { redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s", strerror(err)); usleep(1000000); } pthread_sigmask(SIG_SETMASK, &omask, NULL); server.io_active_threads++; }
IOThreadEntryPoint会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。
static void *IOThreadEntryPoint(void *arg) { iojob *j; listNode *ln; REDIS_NOTUSED(arg); pthread_detach(pthread_self()); while(1) { /* Get a new job to process */ lockThreadedIO(); if (listLength(server.io_newjobs) == 0) { /* No new jobs in queue, exit. */ redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do", (long) pthread_self()); server.io_active_threads--; unlockThreadedIO(); return NULL; } ln = listFirst(server.io_newjobs); j = ln->value; listDelNode(server.io_newjobs,ln); /* Add the job in the processing queue */ j->thread = pthread_self(); listAddNodeTail(server.io_processing,j); ln = listLast(server.io_processing); /* We use ln later to remove it */ unlockThreadedIO(); redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'", (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr); /* Process the Job */ if (j->type == REDIS_IOJOB_LOAD) { j->val = vmReadObjectFromSwap(j->page,j->key->vtype); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { FILE *fp = fopen("/dev/null","w+"); j->pages = rdbSavedObjectPages(j->val,fp); fclose(fp); } else if (j->type == REDIS_IOJOB_DO_SWAP) { if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR) j->canceled = 1; } /* Done: insert the job into the processed queue */ redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)", (long) pthread_self(), (void*)j, (char*)j->key->ptr); lockThreadedIO(); listDelNode(server.io_processing,ln); listAddNodeTail(server.io_processed,j); unlockThreadedIO(); /* Signal the main thread there is new stuff to process */ assert(write(server.io_ready_pipe_write,"x",1) == 1); } return NULL; /* never reached */ } static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[1]; int retval, processed = 0, toprocess = -1, trytoswap = 1; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); if (privdata != NULL) trytoswap = 0; /* check the comments above... */ /* For every byte we read in the read side of the pipe, there is one * I/O job completed to process. */ while((retval = read(fd,buf,1)) == 1) { iojob *j; listNode *ln; robj *key; struct dictEntry *de; redisLog(REDIS_DEBUG,"Processing I/O completed job"); /* Get the processed element (the oldest one) */ lockThreadedIO(); assert(listLength(server.io_processed) != 0); if (toprocess == -1) { toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100; if (toprocess value; listDelNode(server.io_processed,ln); unlockThreadedIO(); /* If this job is marked as canceled, just ignore it */ if (j->canceled) { freeIOJob(j); continue; } /* Post process it in the main thread, as there are things we * can do just here to avoid race conditions and/or invasive locks */ redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount); de = dictFind(j->db->dict,j->key); assert(de != NULL); key = dictGetEntryKey(de); if (j->type == REDIS_IOJOB_LOAD) { redisDb *db; /* Key loaded, bring it at home */ key->storage = REDIS_VM_MEMORY; key->vm.atime = server.unixtime; vmMarkPagesFree(key->vm.page,key->vm.usedpages); redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)", (unsigned char*) key->ptr); server.vm_stats_swapped_objects--; server.vm_stats_swapins++; dictGetEntryVal(de) = j->val; incrRefCount(j->val); db = j->db; freeIOJob(j); /* Handle clients waiting for this key to be loaded. */ handleClientsBlockedOnSwappedKey(db,key); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { /* Now we know the amount of pages required to swap this object. * Let's find some space for it, and queue this task again * rebranded as REDIS_IOJOB_DO_SWAP. */ if (!vmCanSwapOut() || vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR) { /* Ooops... no space or we can't swap as there is * a fork()ed Redis trying to save stuff on disk. */ freeIOJob(j); key->storage = REDIS_VM_MEMORY; /* undo operation */ } else { /* Note that we need to mark this pages as used now, * if the job will be canceled, we'll mark them as freed * again. */ vmMarkPagesUsed(j->page,j->pages); j->type = REDIS_IOJOB_DO_SWAP; lockThreadedIO(); queueIOJob(j); unlockThreadedIO(); } } else if (j->type == REDIS_IOJOB_DO_SWAP) { robj *val; /* Key swapped. We can finally free some memory. */ if (key->storage != REDIS_VM_SWAPPING) { printf("key->storage: %d\n",key->storage); printf("key->name: %s\n",(char*)key->ptr); printf("key->refcount: %d\n",key->refcount); printf("val: %p\n",(void*)j->val); printf("val->type: %d\n",j->val->type); printf("val->ptr: %s\n",(char*)j->val->ptr); } redisAssert(key->storage == REDIS_VM_SWAPPING); val = dictGetEntryVal(de); key->vm.page = j->page; key->vm.usedpages = j->pages; key->storage = REDIS_VM_SWAPPED; key->vtype = j->val->type; decrRefCount(val); /* Deallocate the object from memory. */ dictGetEntryVal(de) = NULL; redisLog(REDIS_DEBUG, "VM: object %s swapped out at %lld (%lld pages) (threaded)", (unsigned char*) key->ptr, (unsigned long long) j->page, (unsigned long long) j->pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; freeIOJob(j); /* Put a few more swap requests in queue if we are still * out of memory */ if (trytoswap && vmCanSwapOut() && zmalloc_used_memory() > server.vm_max_memory) { int more = 1; while(more) { lockThreadedIO(); more = listLength(server.io_newjobs) <p class="copyright"> 原文地址:redis源代码分析25–VM(下), 感谢原作者分享。 </p>

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









Redisクラスターモードは、シャードを介してRedisインスタンスを複数のサーバーに展開し、スケーラビリティと可用性を向上させます。構造の手順は次のとおりです。異なるポートで奇妙なRedisインスタンスを作成します。 3つのセンチネルインスタンスを作成し、Redisインスタンスを監視し、フェールオーバーを監視します。 Sentinel構成ファイルを構成し、Redisインスタンス情報とフェールオーバー設定の監視を追加します。 Redisインスタンス構成ファイルを構成し、クラスターモードを有効にし、クラスター情報ファイルパスを指定します。各Redisインスタンスの情報を含むnodes.confファイルを作成します。クラスターを起動し、CREATEコマンドを実行してクラスターを作成し、レプリカの数を指定します。クラスターにログインしてクラスター情報コマンドを実行して、クラスターステータスを確認します。作る

Redisデータをクリアする方法:Flushallコマンドを使用して、すべての重要な値をクリアします。 FlushDBコマンドを使用して、現在選択されているデータベースのキー値をクリアします。 [選択]を使用してデータベースを切り替え、FlushDBを使用して複数のデータベースをクリアします。 DELコマンドを使用して、特定のキーを削除します。 Redis-CLIツールを使用してデータをクリアします。

Redis指令を使用するには、次の手順が必要です。Redisクライアントを開きます。コマンド(動詞キー値)を入力します。必要なパラメーターを提供します(指示ごとに異なります)。 Enterを押してコマンドを実行します。 Redisは、操作の結果を示す応答を返します(通常はOKまたは-ERR)。

Redisのキューを読むには、キュー名を取得し、LPOPコマンドを使用して要素を読み、空のキューを処理する必要があります。特定の手順は次のとおりです。キュー名を取得します:「キュー:キュー」などの「キュー:」のプレフィックスで名前を付けます。 LPOPコマンドを使用します。キューのヘッドから要素を排出し、LPOP Queue:My-Queueなどの値を返します。空のキューの処理:キューが空の場合、LPOPはnilを返し、要素を読む前にキューが存在するかどうかを確認できます。

Redisを使用して操作をロックするには、setnxコマンドを介してロックを取得し、有効期限を設定するために有効期限コマンドを使用する必要があります。特定の手順は次のとおりです。(1)SETNXコマンドを使用して、キー価値ペアを設定しようとします。 (2)expireコマンドを使用して、ロックの有効期限を設定します。 (3)Delコマンドを使用して、ロックが不要になったときにロックを削除します。

Redisソースコードを理解する最良の方法は、段階的に進むことです。Redisの基本に精通してください。開始点として特定のモジュールまたは機能を選択します。モジュールまたは機能のエントリポイントから始めて、行ごとにコードを表示します。関数コールチェーンを介してコードを表示します。 Redisが使用する基礎となるデータ構造に精通してください。 Redisが使用するアルゴリズムを特定します。

Redisは、メッセージミドルウェアとして、生産消費モデルをサポートし、メッセージを持続し、信頼できる配信を確保できます。メッセージミドルウェアとしてRedisを使用すると、低遅延、信頼性の高いスケーラブルなメッセージングが可能になります。

Redisサーバーを起動する手順には、以下が含まれます。オペレーティングシステムに従ってRedisをインストールします。 Redis-Server(Linux/Macos)またはRedis-Server.exe(Windows)を介してRedisサービスを開始します。 Redis-Cli ping(Linux/macos)またはRedis-Cli.exePing(Windows)コマンドを使用して、サービスステータスを確認します。 Redis-Cli、Python、node.jsなどのRedisクライアントを使用して、サーバーにアクセスします。
