데이터 베이스 MySQL 튜토리얼 redis源代码分析25–VM(下)

redis源代码分析25–VM(下)

Jun 07, 2016 pm 04:26 PM
redis 소개하다 분석하다 소스 코드

这一节介绍下redis中的多线程机制。 先看看多线程换出的机制。 serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThrea

这一节介绍下redis中的多线程机制。

先看看多线程换出的机制。

serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThreaded。

static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
    iojob *j;
    assert(key->storage == REDIS_VM_MEMORY);
    assert(key->refcount == 1);
    j = zmalloc(sizeof(*j));
    j->type = REDIS_IOJOB_PREPARE_SWAP;
    j->db = db;
    j->key = key;
    j->val = val;
    incrRefCount(val);
    j->canceled = 0;
    j->thread = (pthread_t) -1;
    key->storage = REDIS_VM_SWAPPING;
    lockThreadedIO();
    queueIOJob(j);
    unlockThreadedIO();
    return REDIS_OK;
}
로그인 후 복사

vmSwapObjectThreaded 会创建一个类型为REDIS_IOJOB_PREPARE_SWAP的job,然后使用queueIOJob来排队。而queueIOJob所做的主要工作就是就是将新job加入到server.io_newjobs,并在创建的线程数还没超过配置值时,创建新的线程。

/* This function must be called while with threaded IO locked */
static void queueIOJob(iojob *j) {
    redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
        (void*)j, j->type, (char*)j->key->ptr);
    listAddNodeTail(server.io_newjobs,j);
    if (server.io_active_threads 
<p>从spawnIOThread中可以知道,新线程的入口点是IOThreadEntryPoint。</p>
<pre class="brush:php;toolbar:false">
static void spawnIOThread(void) {
    pthread_t thread;
    sigset_t mask, omask;
    int err;
    sigemptyset(&mask);
    sigaddset(&mask,SIGCHLD);
    sigaddset(&mask,SIGHUP);
    sigaddset(&mask,SIGPIPE);
    pthread_sigmask(SIG_SETMASK, &mask, &omask);
    while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) {
        redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s",
            strerror(err));
        usleep(1000000);
    }
    pthread_sigmask(SIG_SETMASK, &omask, NULL);
    server.io_active_threads++;
}
로그인 후 복사

IOThreadEntryPoint会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。

static void *IOThreadEntryPoint(void *arg) {
    iojob *j;
    listNode *ln;
    REDIS_NOTUSED(arg);
    pthread_detach(pthread_self());
    while(1) {
        /* Get a new job to process */
        lockThreadedIO();
        if (listLength(server.io_newjobs) == 0) {
            /* No new jobs in queue, exit. */
            redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
                (long) pthread_self());
            server.io_active_threads--;
            unlockThreadedIO();
            return NULL;
        }
        ln = listFirst(server.io_newjobs);
        j = ln->value;
        listDelNode(server.io_newjobs,ln);
        /* Add the job in the processing queue */
        j->thread = pthread_self();
        listAddNodeTail(server.io_processing,j);
        ln = listLast(server.io_processing); /* We use ln later to remove it */
        unlockThreadedIO();
        redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
            (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
        /* Process the Job */
        if (j->type == REDIS_IOJOB_LOAD) {
            j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
            FILE *fp = fopen("/dev/null","w+");
            j->pages = rdbSavedObjectPages(j->val,fp);
            fclose(fp);
        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
            if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
                j->canceled = 1;
        }
        /* Done: insert the job into the processed queue */
        redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
            (long) pthread_self(), (void*)j, (char*)j->key->ptr);
        lockThreadedIO();
        listDelNode(server.io_processing,ln);
        listAddNodeTail(server.io_processed,j);
        unlockThreadedIO();
        /* Signal the main thread there is new stuff to process */
        assert(write(server.io_ready_pipe_write,"x",1) == 1);
    }
    return NULL; /* never reached */
}
static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
            int mask)
{
    char buf[1];
    int retval, processed = 0, toprocess = -1, trytoswap = 1;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    if (privdata != NULL) trytoswap = 0; /* check the comments above... */
    /* For every byte we read in the read side of the pipe, there is one
     * I/O job completed to process. */
    while((retval = read(fd,buf,1)) == 1) {
        iojob *j;
        listNode *ln;
        robj *key;
        struct dictEntry *de;
        redisLog(REDIS_DEBUG,"Processing I/O completed job");
        /* Get the processed element (the oldest one) */
        lockThreadedIO();
        assert(listLength(server.io_processed) != 0);
        if (toprocess == -1) {
            toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100;
            if (toprocess value;
        listDelNode(server.io_processed,ln);
        unlockThreadedIO();
        /* If this job is marked as canceled, just ignore it */
        if (j->canceled) {
            freeIOJob(j);
            continue;
        }
        /* Post process it in the main thread, as there are things we
         * can do just here to avoid race conditions and/or invasive locks */
        redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
        de = dictFind(j->db->dict,j->key);
        assert(de != NULL);
        key = dictGetEntryKey(de);
        if (j->type == REDIS_IOJOB_LOAD) {
            redisDb *db;
            /* Key loaded, bring it at home */
            key->storage = REDIS_VM_MEMORY;
            key->vm.atime = server.unixtime;
            vmMarkPagesFree(key->vm.page,key->vm.usedpages);
            redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
                (unsigned char*) key->ptr);
            server.vm_stats_swapped_objects--;
            server.vm_stats_swapins++;
            dictGetEntryVal(de) = j->val;
            incrRefCount(j->val);
            db = j->db;
            freeIOJob(j);
            /* Handle clients waiting for this key to be loaded. */
            handleClientsBlockedOnSwappedKey(db,key);
        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
            /* Now we know the amount of pages required to swap this object.
             * Let's find some space for it, and queue this task again
             * rebranded as REDIS_IOJOB_DO_SWAP. */
            if (!vmCanSwapOut() ||
                vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
            {
                /* Ooops... no space or we can't swap as there is
                 * a fork()ed Redis trying to save stuff on disk. */
                freeIOJob(j);
                key->storage = REDIS_VM_MEMORY; /* undo operation */
            } else {
                /* Note that we need to mark this pages as used now,
                 * if the job will be canceled, we'll mark them as freed
                 * again. */
                vmMarkPagesUsed(j->page,j->pages);
                j->type = REDIS_IOJOB_DO_SWAP;
                lockThreadedIO();
                queueIOJob(j);
                unlockThreadedIO();
            }
        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
            robj *val;
            /* Key swapped. We can finally free some memory. */
            if (key->storage != REDIS_VM_SWAPPING) {
                printf("key->storage: %d\n",key->storage);
                printf("key->name: %s\n",(char*)key->ptr);
                printf("key->refcount: %d\n",key->refcount);
                printf("val: %p\n",(void*)j->val);
                printf("val->type: %d\n",j->val->type);
                printf("val->ptr: %s\n",(char*)j->val->ptr);
            }
            redisAssert(key->storage == REDIS_VM_SWAPPING);
            val = dictGetEntryVal(de);
            key->vm.page = j->page;
            key->vm.usedpages = j->pages;
            key->storage = REDIS_VM_SWAPPED;
            key->vtype = j->val->type;
            decrRefCount(val); /* Deallocate the object from memory. */
            dictGetEntryVal(de) = NULL;
            redisLog(REDIS_DEBUG,
                "VM: object %s swapped out at %lld (%lld pages) (threaded)",
                (unsigned char*) key->ptr,
                (unsigned long long) j->page, (unsigned long long) j->pages);
            server.vm_stats_swapped_objects++;
            server.vm_stats_swapouts++;
            freeIOJob(j);
            /* Put a few more swap requests in queue if we are still
             * out of memory */
            if (trytoswap && vmCanSwapOut() &&
                zmalloc_used_memory() > server.vm_max_memory)
            {
                int more = 1;
                while(more) {
                    lockThreadedIO();
                    more = listLength(server.io_newjobs) 
    <p class="copyright">
        原文地址:redis源代码分析25–VM(下), 感谢原作者分享。
    </p>
    
    


로그인 후 복사
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

인기 기사

R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 아무도들을 수없는 경우 오디오를 수정하는 방법
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

메모장++7.3.1

메모장++7.3.1

사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

드림위버 CS6

드림위버 CS6

시각적 웹 개발 도구

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

Windows 11 10.0.22000.100 설치 시 발생하는 0x80242008 오류 해결 방법 Windows 11 10.0.22000.100 설치 시 발생하는 0x80242008 오류 해결 방법 May 08, 2024 pm 03:50 PM

1. [시작] 메뉴를 시작하여 [cmd]를 입력하고 [명령 프롬프트]를 마우스 오른쪽 버튼으로 클릭한 후 [관리자 권한으로 실행]을 선택합니다. 2. 다음 명령을 순서대로 입력합니다(주의 깊게 복사하여 붙여넣기): SCconfigwuauservstart=auto, Enter SCconfigbitsstart=auto, Enter 누르기 SCconfigcryptsvcstart=auto, Enter SCconfigtrustedinstallerstart=auto, Enter SCconfigwuauservtype=share, Enter netstopwuauserv , Enter netstopcryptS 누르기

Golang API 캐싱 전략 및 최적화 Golang API 캐싱 전략 및 최적화 May 07, 2024 pm 02:12 PM

GolangAPI의 캐싱 전략은 성능을 향상시키고 서버 부하를 줄일 수 있습니다. 일반적으로 사용되는 전략은 LRU, LFU, FIFO 및 TTL입니다. 최적화 기술에는 적절한 캐시 스토리지 선택, 계층적 캐싱, 무효화 관리, 모니터링 및 조정이 포함됩니다. 실제 사례에서 LRU 캐시는 데이터베이스에서 사용자 정보를 얻기 위한 API를 최적화하는 데 사용됩니다. 그렇지 않으면 캐시를 데이터베이스에서 얻은 후 업데이트할 수 있습니다.

PHP 개발의 캐싱 메커니즘 및 응용 실습 PHP 개발의 캐싱 메커니즘 및 응용 실습 May 09, 2024 pm 01:30 PM

PHP 개발에서 캐싱 메커니즘은 자주 액세스하는 데이터를 메모리나 디스크에 임시 저장하여 데이터베이스 액세스 횟수를 줄여 성능을 향상시킵니다. 캐시 유형에는 주로 메모리, 파일 및 데이터베이스 캐시가 포함됩니다. 캐싱은 내장 함수나 캐시_get() 및 Memcache와 같은 타사 라이브러리를 사용하여 PHP에서 구현할 수 있습니다. 일반적인 실제 응용 프로그램에는 쿼리 성능을 최적화하기 위한 데이터베이스 쿼리 결과 캐싱과 렌더링 속도를 높이기 위한 페이지 출력 캐싱이 포함됩니다. 캐싱 메커니즘은 웹사이트 응답 속도를 효과적으로 향상시키고, 사용자 경험을 향상시키며, 서버 부하를 줄입니다.

C++ 동시 프로그래밍: 스레드 고갈과 우선순위 반전을 방지하는 방법은 무엇입니까? C++ 동시 프로그래밍: 스레드 고갈과 우선순위 반전을 방지하는 방법은 무엇입니까? May 06, 2024 pm 05:27 PM

스레드 부족을 방지하려면 공정한 잠금을 사용하여 리소스를 공정하게 할당하거나 스레드 우선순위를 설정할 수 있습니다. 우선순위 역전 문제를 해결하려면 리소스를 보유한 스레드의 우선순위를 일시적으로 높이는 우선순위 상속을 사용하거나 리소스가 필요한 스레드의 우선순위를 높이는 잠금 승격을 사용할 수 있습니다.

Samsung S24ai 기능에 대한 자세한 소개 Samsung S24ai 기능에 대한 자세한 소개 Jun 24, 2024 am 11:18 AM

2024년은 AI 휴대폰 원년이다. AI 스마트 기술을 탑재해 휴대폰을 더욱 효율적이고 편리하게 사용할 수 있는 휴대폰이 늘어나고 있다. 최근 연초 출시된 갤럭시 S24 시리즈에서는 제너레이티브 AI 경험이 다시 한 번 향상됐다. 자세한 기능 소개는 아래에서 살펴보자. 1. 생성적 AI의 강력한 강화 Samsung Galaxy S24 시리즈는 Galaxy AI의 강화를 통해 많은 지능형 애플리케이션을 제공했습니다. 이러한 기능은 Samsung One UI6.1과 긴밀하게 통합되어 사용자가 언제든지 편리하고 지능적인 경험을 얻을 수 있습니다. 휴대전화의 성능과 사용 편의성을 향상시킵니다. 갤럭시 S24 시리즈가 개척한 원 앤 검색 기능은 사용자가 길게 누르기만 하면 되는 기능 중 하나입니다.

Win11 영어 21996을 중국어 간체 22000으로 업그레이드하는 방법_Win11 영어 21996을 중국어 간체 22000으로 업그레이드하는 방법 Win11 영어 21996을 중국어 간체 22000으로 업그레이드하는 방법_Win11 영어 21996을 중국어 간체 22000으로 업그레이드하는 방법 May 08, 2024 pm 05:10 PM

먼저 시스템 언어를 중국어 간체 표시로 설정하고 다시 시작해야 합니다. 물론 이전에 표시 언어를 중국어 간체로 변경했다면 이 단계를 건너뛰어도 됩니다. 다음으로 레지스트리 조작을 시작하여 regedit.exe를 실행하고 왼쪽 탐색바 또는 상단 주소 표시줄의 HKEY_LOCAL_MACHINESYSTEMCurrentControlSetControlNlsLanguage로 직접 이동한 후 InstallLanguage 키 값과 Default 키 값을 0804로 수정합니다(영어 en-로 변경하려는 경우). 먼저 시스템 표시 언어를 en-us로 설정하고 시스템을 다시 시작한 다음 모든 항목을 0409로 변경해야 합니다. 이 시점에서 시스템을 다시 시작해야 합니다.

C++ 동시 프로그래밍: 스레드 종료 및 취소를 수행하는 방법은 무엇입니까? C++ 동시 프로그래밍: 스레드 종료 및 취소를 수행하는 방법은 무엇입니까? May 06, 2024 pm 02:12 PM

C++의 스레드 종료 및 취소 메커니즘은 다음과 같습니다. 스레드 종료: std::thread::join()은 대상 스레드가 실행을 완료할 때까지 현재 스레드를 차단합니다. std::thread::detach()는 스레드 관리에서 대상 스레드를 분리합니다. 스레드 취소: std::thread::request_termination()은 대상 스레드에 실행을 종료하도록 요청합니다. std::thread::get_id()는 대상 스레드 ID를 획득하고 std::terminate()와 함께 사용하여 대상을 즉시 종료할 수 있습니다. 실. 실제 전투에서 request_termination()은 스레드가 종료 시점을 결정하도록 허용하고, Join()은 이를 메인 라인에서 보장합니다.

Win11에서 다운로드한 업데이트 파일 찾는 방법_Win11에서 다운로드한 업데이트 파일 위치 공유 Win11에서 다운로드한 업데이트 파일 찾는 방법_Win11에서 다운로드한 업데이트 파일 위치 공유 May 08, 2024 am 10:34 AM

1. 먼저 바탕화면의 [내 PC] 아이콘을 더블클릭하여 엽니다. 2. 그런 다음 마우스 왼쪽 버튼을 더블 클릭하여 [C 드라이브]로 들어갑니다. 일반적으로 시스템 파일은 C 드라이브에 자동으로 저장됩니다. 3. 그리고 C 드라이브에 있는 [windows] 폴더를 찾아 더블클릭하여 들어갑니다. 4. [windows] 폴더 진입 후, [SoftwareDistribution] 폴더를 찾아주세요. 5. 진입 후, win11 다운로드 및 업데이트 파일이 모두 들어있는 [다운로드] 폴더를 찾아주세요. 6. 이 파일을 삭제하려면 이 폴더에서 직접 삭제하면 됩니다.

See all articles