static
void *IOThreadEntryPoint(void *arg) {
iojob *j;
listNode *ln;
REDIS_NOTUSED(arg);
pthread_detach(pthread_self());
while
(1) {
lockThreadedIO();
if
(listLength(server.io_newjobs) == 0) {
redisLog(REDIS_DEBUG,
"Thread %ld exiting, nothing to do"
,
(long) pthread_self());
server.io_active_threads--;
unlockThreadedIO();
return
NULL;
}
ln = listFirst(server.io_newjobs);
j = ln->value;
listDelNode(server.io_newjobs,ln);
j->thread = pthread_self();
listAddNodeTail(server.io_processing,j);
ln = listLast(server.io_processing);
unlockThreadedIO();
redisLog(REDIS_DEBUG,
"Thread %ld got a new job (type %d): %p about key '%s'"
,
(long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
if
(j->type == REDIS_IOJOB_LOAD) {
j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
}
else
if
(j->type == REDIS_IOJOB_PREPARE_SWAP) {
FILE *fp =
fopen
(
"/dev/null"
,
"w+"
);
j->pages = rdbSavedObjectPages(j->val,fp);
fclose(fp);
}
else
if
(j->type == REDIS_IOJOB_DO_SWAP) {
if
(vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
j->canceled = 1;
}
redisLog(REDIS_DEBUG,
"Thread %ld completed the job: %p (key %s)"
,
(long) pthread_self(), (void*)j, (char*)j->key->ptr);
lockThreadedIO();
listDelNode(server.io_processing,ln);
listAddNodeTail(server.io_processed,j);
unlockThreadedIO();
assert(write(server.io_ready_pipe_write,
"x"
,1) == 1);
}
return
NULL;
}
static
void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
int mask)
{
char buf[1];
int retval, processed = 0, toprocess = -1, trytoswap = 1;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
if
(privdata != NULL) trytoswap = 0;
while
((retval = read(fd,buf,1)) == 1) {
iojob *j;
listNode *ln;
robj *key;
struct dictEntry *de;
redisLog(REDIS_DEBUG,
"Processing I/O completed job"
);
lockThreadedIO();
assert(listLength(server.io_processed) != 0);
if
(toprocess == -1) {
toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100;
if
(toprocess value;
listDelNode(server.io_processed,ln);
unlockThreadedIO();
if
(j->canceled) {
freeIOJob(j);
continue
;
}
redisLog(REDIS_DEBUG,
"Job %p type: %d, key at %p (%s) refcount: %d\n"
, (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
de = dictFind(j->db->dict,j->key);
assert(de != NULL);
key = dictGetEntryKey(de);
if
(j->type == REDIS_IOJOB_LOAD) {
redisDb *db;
key->storage = REDIS_VM_MEMORY;
key->vm.atime = server.unixtime;
vmMarkPagesFree(key->vm.page,key->vm.usedpages);
redisLog(REDIS_DEBUG,
"VM: object %s loaded from disk (threaded)"
,
(unsigned char*) key->ptr);
server.vm_stats_swapped_objects--;
server.vm_stats_swapins++;
dictGetEntryVal(de) = j->val;
incrRefCount(j->val);
db = j->db;
freeIOJob(j);
handleClientsBlockedOnSwappedKey(db,key);
}
else
if
(j->type == REDIS_IOJOB_PREPARE_SWAP) {
if
(!vmCanSwapOut() ||
vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
{
freeIOJob(j);
key->storage = REDIS_VM_MEMORY;
}
else
{
vmMarkPagesUsed(j->page,j->pages);
j->type = REDIS_IOJOB_DO_SWAP;
lockThreadedIO();
queueIOJob(j);
unlockThreadedIO();
}
}
else
if
(j->type == REDIS_IOJOB_DO_SWAP) {
robj *val;
if
(key->storage != REDIS_VM_SWAPPING) {
printf(
"key->storage: %d\n"
,key->storage);
printf(
"key->name: %s\n"
,(char*)key->ptr);
printf(
"key->refcount: %d\n"
,key->refcount);
printf(
"val: %p\n"
,(void*)j->val);
printf(
"val->type: %d\n"
,j->val->type);
printf(
"val->ptr: %s\n"
,(char*)j->val->ptr);
}
redisAssert(key->storage == REDIS_VM_SWAPPING);
val = dictGetEntryVal(de);
key->vm.page = j->page;
key->vm.usedpages = j->pages;
key->storage = REDIS_VM_SWAPPED;
key->vtype = j->val->type;
decrRefCount(val);
dictGetEntryVal(de) = NULL;
redisLog(REDIS_DEBUG,
"VM: object %s swapped out at %lld (%lld pages) (threaded)"
,
(unsigned char*) key->ptr,
(unsigned long long) j->page, (unsigned long long) j->pages);
server.vm_stats_swapped_objects++;
server.vm_stats_swapouts++;
freeIOJob(j);
if
(trytoswap && vmCanSwapOut() &&
zmalloc_used_memory() > server.vm_max_memory)
{
int more = 1;
while
(more) {
lockThreadedIO();
more = listLength(server.io_newjobs)
<p
class
=
"copyright"
>
原文地址:redis源代码分析25–VM(下), 感谢原作者分享。
</p>