Home > Backend Development > C#.Net Tutorial > Raft implementation in c language (with code)

Raft implementation in c language (with code)

烟雨青岚
Release: 2020-06-18 17:44:57
forward
6567 people have browsed it

Raft implementation in c language (with code)

C language Raft implementation (with code)

1. Introduction

This article introduces a simple Raft implementation. If you have read the Raft paper, you will find it easier to read this Raft implementation, because the details of the implementation are described in great detail in the Raft paper. The engineering implementation is basically to re-express the description in the Raft paper in programming language. This is the biggest advantage of Raft over Paxos, that is, it is easy to understand and implement. The Raft implementation introduced in this article is coded in C language. Except for the log compression function, other features are implemented. The member change mechanism is also relatively simple, and only supports one configuration change at a time. For the principles of Raft, you can read the Raft paper and "Raft Understanding".

2. Basic concepts of Raft

2.1 Status

raft has three states: Leader, Candidate and Follower. The transitions between these three states are shown in the figure below. Only the Leader has the right to process customer requests and copy logs to Followers. Candidate is an intermediate state between Follower and Leader. When there is no Leader in the cluster, Follower enters the Candidate state and initiates a vote to the cluster. The Follower that obtains the majority of votes will become the Leader.

2.2 Messages

In order to improve the understandability of the protocol, Raft has simplified the setting of message types and only has the following two requests. [indent]
requestVote Initiates a voting request. Candidate's request when initiating a vote. It is received and processed by other Followers and Candidates in the cluster.
appendEntries Add log request. The request issued by the Leader when adding logs to the Follower. [/indent]

2.3 Term number

The term number term is used in the Raft protocol to indicate the old and new relationship of time. This term value within the term of each Leader is Unchanging, it is absolutely different among different Leaders and increases monotonically with time. If the term of request A is larger than that of request B, request B is out of date.

3.Raft implementation

3.1 Protocol

First introduce four important data structures, corresponding to the ones mentioned above requestVote and appendEntries request and reply.

/** requestVote 请求投票
   * 竞选者Candidate去竞选Leader时发送给其它node的投票请求。
   * 其它Leader或者Candidate收到term比自己大的投票请求时,会自动变成Follower*/
typedef struct
{
    /** 当前任期号,通过任期号的大小与其它Candidate竞争Leader */
    int term;

    /** 竞选者的id */
    int candidate_id;

    /** 竞选者本地保存的最新一条日志的index */
    int last_log_idx;

    /** 竞选者本地保存的最新一条日志的任期号*/
    int last_log_term;
} msg_requestvote_t;


/** 投票请求的回复response.
  * 该response主要是给返回某个node是否接收了Candidate的投票请求. */
typedef struct
{
    /** node的任期号,Candidate根据投票结果和node的任期号来更新自己的任期号 */
    int term;

    /** 投票结果,如果node给Candidate投票则为true */
    int vote_granted;
} msg_requestvote_response_t;

/**  添加日志请求.
  * Follower可以从该消息中知道哪些日志可以安全地提交到状态机FSM中去。
  * Leader可以将该消息作为心跳消息定期发送。
  * 旧的Leader和Candidate收到该消息后可能会自动变成Follower */
typedef struct
{
    /** Leader当前的任期号 */
    int term;

    /** 最新日志的前一条日志的index,用于Follower确认与Leader的日志完全一致 */
    int prev_log_idx;

    /** 最新日志的前一条日志的任期号term */
    int prev_log_term;

    /** leader当前已经确认提交到状态机FSM的日志索引index,这意味着Follower也可以安全地将该索引index以前的日志提交 */
    int leader_commit;

    /** 这条添加日志消息携带的日志条数,该实现中最多只有一条 */
    int n_entries;

    /** 这条添加日志消息中携带的日志数组 */
    msg_entry_t* entries;
} msg_appendentries_t;

/** 添加日志回复.
 * 旧的Leader或Candidate收到该消息会变成Follower */
typedef struct
{
    /** 当前任期号 */
    int term;

    /** node成功添加日志时返回ture,即prev_log_index和prev_log_term都比对成功。否则返回false */
    int success;

    /* 下面两个字段不是Raft论文中规定的字段:
    /* 用来优化日志追加过程,以加速日志的追加。Raft原文中的追加过程是一次只能追加一条日志*/

    /** 处理添加日志请求后本地的最大日志索引 */
    int current_idx;

    /** 从添加日志请求中接受的第一条日志索引 */
    int first_idx;
} msg_appendentries_response_t;
Copy after login

3.2 Two important abstractions

##raft_server_private_t This structure is the abstraction in Raft’s implementation. Saves the status and all data required during the operation of the Raft protocol.

typedef struct {
    /* 所有服务器比较固定的状态: */

    /* 服务器最后一次知道的任期号(初始化为 0,持续递增) */
    int current_term;

    /* 记录在当前分期内给哪个Candidate投过票,
       */
    int voted_for;

    /* 日志条目集;每一个条目包含一个用户状态机执行的指令,和收到时的任期号 */
    void* log;

    /* 变动比较频繁的变量: */

    /* 已知的最大的已经被提交的日志条目的索引值 */
    int commit_idx;

    /* 最后被应用到状态机的日志条目索引值(初始化为 0,持续递增) */
    int last_applied_idx;

    /* 三种状态:follower/leader/candidate */
    int state;

    /* 计时器,周期函数每次执行时会递增改值 */
    int timeout_elapsed;

    raft_node_t* nodes;
    int num_nodes;

    int election_timeout;
    int request_timeout;

    /* 保存Leader的信息,没有Leader时为NULL */
    raft_node_t* current_leader;

    /* callbacks,由调用该raft实现的调用者来实现,网络IO和持久存储
     * 都由调用者在callback中实现 */
    raft_cbs_t cb;
    void* udata;

    /* 自己的信息 */
    raft_node_t* node;

    /* 该raft实现每次只进行一个服务器的配置更改,该变量记录raft server
     * 是否正在进行配置更改*/
    int voting_cfg_change_log_idx;
} raft_server_private_t;
Copy after login

raft_node_private_t The abstract body of the machine node in the cluster contains information on other machines that needs to be saved during the running of the raft protocol

typedef struct
{
    void* udata;  /*一般保存与其它机器的连接信息,由使用者决定怎么实现连接*/

    int next_idx; /*对于每一个服务器,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值加一)*/
    int match_idx; /*对于每一个服务器,已经复制给他的日志的最高索引值*/

    int flags; /*有三种取值,是相或的关系 1:该机器有给我投票 2:该机器有投票权  3: 该机器有最新的日志*/

    int id; /*机器对应的id值,这个每台机器在全局都是唯一的*/
} raft_node_private_t;
Copy after login

3.3 Raft protocol Process

Periodic function Raft needs to do some things periodically. For example, the Leader needs to periodically append logs to other servers, so that servers with lagging logs have a chance to catch up; All servers need to periodically apply confirmed commit logs to the state machine, etc. raft_periodic function is a function that is called periodically in the raft implementation, and the calling period is 1000ms. The machine will do different things in this function in different states. Leader periodically synchronizes logs to Follower. The Follower periodically detects whether it has not received a heartbeat packet from the Leader within a specific period of time. If so, it becomes a Candidate and starts voting for the Leader. Both Leader and Follower will periodically commit submitted logs to the state machine FSM.

/** raft周期性执行的函数,实现raft中的定时器以及定期应用日志到状态机
  */
int raft_periodic(raft_server_t* me_, int msec_since_last_period)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    /* 选举计时器;Follower每次收到Leader的心跳后会重置清0,Leader每次发送日志也会清0 */
    me->timeout_elapsed += msec_since_last_period;

    /* Leader周期性地向Follower同步日志 */
    if (me->state == RAFT_STATE_LEADER) 
    {
        if (me->request_timeout <= me->timeout_elapsed)
            raft_send_appendentries_all(me_);
    }
    /* Follower检测选举计时器是否超时 */
    else if (me->election_timeout <= me->timeout_elapsed)
    {
        if (1 < me->num_nodes)
            raft_election_start(me_);
    }

    /* 周期性地将已经确认commit的日志应用到状态机FSM */
    if (me->last_applied_idx < me->commit_idx)
        if (-1 == raft_apply_entry(me_))
            return -1;

    return 0;
}
Copy after login

Become a candidate Candidate Each server in the cluster has an election timer. When a server does not receive a heartbeat from the Leader within the timer timeout period, the cluster is considered If there is no leader in the server or the leader is down, the server will become a candidate, and then initiate a vote to run for leader. The following raft_become_candidate function is the function for the server to become a candidate. The function mainly does the following things:

Increase the current term number (currentTerm)


Vote for yourself


Reset the election timeout timer


Send The RPC requesting voting is sent to all other servers

/** Follower成为Candidate执行的函数
  */
void raft_become_candidate(raft_server_t* me_)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;
    int i;

    /*自增当前的任期号;给自己投票,设置自己的状态为CANDIDATE*/
    raft_set_current_term(me_, raft_get_current_term(me_) + 1);
    for (i = 0; i < me->num_nodes; i++)
        raft_node_vote_for_me(me->nodes[i], 0);
    raft_vote(me_, me->node);
    me->current_leader = NULL;
    raft_set_state(me_, RAFT_STATE_CANDIDATE);

    /* 重置选举超时计时器。为了防止多个Candidate竞争,将下一次发起投票的时间间隔设置成随机值*/
    /* TODO: this should probably be lower */
    me->timeout_elapsed = rand() % me->election_timeout;

    /*发送请求投票的 RPC 给其他所有服务器*/
    for (i = 0; i < me->num_nodes; i++)
        if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i]))
            raft_send_requestvote(me_, me->nodes[i]);
}
Copy after login

Processing voting requestsThe logic of processing voting requests is mainly to determine whether to agree to vote. The basis for judgment is the term number and log information in the request The level of freshness and age, and whether you have voted for other servers with the same term number. If you have voted before, you cannot vote again. Each person only has one vote.

If term > currentTerm, switch to Follower mode.

The server that receives the voting request here may be a Leader with poor network conditions or a Candidate that has not yet had time to issue a voting request. After they receive a request with a newer term number than their own, they must unconditionally become Follower to ensure that only one Leader exists

如果term < currentTerm返回false。请求中的term比自己的term还要小,说明是一个过时的请求,则不给它投票返回false。

如果 term == currentTerm,请求中的日志信息不比本地日志旧,并且尚未给其它Candidate投过票,那么就投票给他

/** 处理投票请求 
*/
int raft_recv_requestvote(raft_server_t* me_,
                          raft_node_t* node,
                          msg_requestvote_t* vr,
                          msg_requestvote_response_t *r)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    /*如果请求中term > 本地currentTerm, 则转为Follower模式*/
    if (raft_get_current_term(me_) < vr->term)
    {
        raft_set_current_term(me_, vr->term);
        raft_become_follower(me_);
    }

    /*如果需要投票,则回复true,即将r->vote_granted = 1;*/
    if (__should_grant_vote(me, vr))
    {
        assert(!(raft_is_leader(me_) || raft_is_candidate(me_)));

        /*同意投票--本地记录给哪个服务器投了票,并设置response中的vote_granted为1*/
        raft_vote_for_nodeid(me_, vr->candidate_id);
        r->vote_granted = 1;

        /* there must be in an election. */
        me->current_leader = NULL;

        me->timeout_elapsed = 0;
    }
    else
        r->vote_granted = 0;

    __log(me_, node, "node requested vote: %d replying: %s",
          node, r->vote_granted == 1 ? "granted" : "not granted");

    /*更新本地保存的任期号,与请求中的保持一致*/
    r->term = raft_get_current_term(me_);
    return 0;
}

/** 检查是否满足投票的条件 
*/
static int __should_grant_vote(raft_server_private_t* me, msg_requestvote_t* vr)
{
    /**请求中的任期号term比本地term要小,不给投票*/
    if (vr->term < raft_get_current_term((void*)me))
        return 0;

    /*如果已经投过票了,返回false*/
    /* TODO: if voted for is candiate return 1 (if below checks pass) */
    if (raft_already_voted((void*)me))
        return 0;

    /* 下面代码检查请求中日志信息是否比本地日志新*/

    /* 获取本地最新的日志索引 */
    int current_idx = raft_get_current_idx((void*)me);

    /* 本地日志为空,请求中的日志信息绝对比本地要新,返回true */
    if (0 == current_idx)
        return 1;

    /* 如果本地最新日志中的任期号比请求中的last_log_term要小,则返回true */
    raft_entry_t* e = raft_get_entry_from_idx((void*)me, current_idx);
    if (e->term < vr->last_log_term)
        return 1;

    /* 本地最新日志中的任期号与请求中的last_log_term相等,则比较日志索引,索引比较大的说明日志比较新*/
    if (vr->last_log_term == e->term && current_idx <= vr->last_log_idx)
        return 1;

    /*果本地最新日志中的任期号比请求中的last_log_term要大,则返回false */
    return 0;
}
Copy after login
  • 收到投票回复 Candidate收到投票回复后,检查是否给自己投了票,如果投了票则统计当前收到的投票总数,超过一半则成为Leader
/** 处理投票恢复 
*/
int raft_recv_requestvote_response(raft_server_t* me_,
                                   raft_node_t* node,
                                   msg_requestvote_response_t* r)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    __log(me_, node, "node responded to requestvote status: %s",
          r->vote_granted == 1 ? "granted" : "not granted");

    /* Oh~我不是Candidate,直接返回 */
    if (!raft_is_candidate(me_))
    {
        return 0;
    }
    /* response中的任期号比自己的大,说明自己的term已经过时,无条件转为Follower */
    else if (raft_get_current_term(me_) < r->term)
    {
        raft_set_current_term(me_, r->term);
        raft_become_follower(me_);
        return 0;
    }
    /* response中的任期号比自己小,说明收到了一个过时的response,忽略即可。
     * 当网络比较差的时候容易出现这种情况 */
    else if (raft_get_current_term(me_) != r->term)
    {
        return 0;
    }

    __log(me_, node, "node responded to requestvote: %d status: %s ct:%d rt:%d",
          node, r->vote_granted == 1 ? "granted" : "not granted",
          me->current_term,
          r->term);

    /* Yeah~给我投票了 */
    if (1 == r->vote_granted)
    {
        /* 记录给自己投票的服务器信息 */
        if (node)
            raft_node_vote_for_me(node, 1);
        int votes = raft_get_nvotes_for_me(me_);
        /* 如果给自己投票的服务器超过了总数的一般,则成为Leader */
        if (raft_votes_is_majority(me->num_nodes, votes))
            raft_become_leader(me_);
    }

    return 0;
}
Copy after login

添加日志请求 Leader除了在收到客户端请求后会发起添加日志请求,还会在周期函数raft_periodic中发起添加日志请求。Leader维护了所有Follower的日志情况,如果Follower的日志比较旧,就会周期性地给它发送添加日志请求。关于日志怎么同步和保持一致性的原理,可以阅读raft论文5.3节--日志复制。简单地说就是,Leader在给Follower发送一条日志N时,会顺带将前一条日志M的信息也带过去。Follower会检查请求中前一条日志M的信息与本地相同索引的日志是否吻合,如果吻合说明本地在M以前的所有日志都是和Leader一致的(raft论文中使用递归法证明,因为所有日志都是按照同样的规则添加的)。

/** 给某个Follower发送添加日志请求
  */
int raft_send_appendentries(raft_server_t* me_, raft_node_t* node)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    assert(node);
    assert(node != me->node);

    /* callback函数,实现网络发送功能,由使用该raft实现的调用者实现网络IO功能*/
    if (!(me->cb.send_appendentries))
        return -1;

    /* 初始化请求的参数-- 当前任期号、最新日志索引 */
    msg_appendentries_t ae;
    ae.term = me->current_term;
    ae.leader_commit = raft_get_commit_idx(me_);
    ae.prev_log_idx = 0;
    ae.prev_log_term = 0;
    ae.n_entries = 0;
    ae.entries = NULL;

    /* 根据记录的Follower的日志信息,获取要发给Follower的下一条日志索引 */
    int next_idx = raft_node_get_next_idx(node);

    msg_entry_t mety;

    /* 添加下一条日志的内容*/
    raft_entry_t* ety = raft_get_entry_from_idx(me_, next_idx);
    if (ety)
    {
        mety.term = ety->term;
        mety.id = ety->id;
        mety.type = ety->type;
        mety.data.len = ety->data.len;
        mety.data.buf = ety->data.buf;
        ae.entries = &mety;
        // TODO: we want to send more than 1 at a time
        ae.n_entries = 1;
    }

    /* 添加要添加日志的前一条日志信息,用来做日志一致性检查,关于怎么保证
     * Leader和Follower日志的一致性,可参看raft论文第5.3节--日志复制*/
    if (1 < next_idx)
    {
        raft_entry_t* prev_ety = raft_get_entry_from_idx(me_, next_idx - 1);
        ae.prev_log_idx = next_idx - 1;
        if (prev_ety)
            ae.prev_log_term = prev_ety->term;
    }

    __log(me_, node, "sending appendentries node: ci:%d t:%d lc:%d pli:%d plt:%d",
          raft_get_current_idx(me_),
          ae.term,
          ae.leader_commit,
          ae.prev_log_idx,
          ae.prev_log_term);

    /* 调用callback发送请求,callback由该raft实现的调用者来实现*/
    me->cb.send_appendentries(me_, me->udata, node, &ae);

    return 0;
}
Copy after login

处理添加日志请求 所有的服务器都有可能收到添加日志请求,比如过时的Leader和Candidate以及正常运行的Follower。处理添加日志请求的过程主要就是验证请求中的日志是否比本地日志新的过程。

/*
1. 处理任期号的三种情况(大于等于和小于)
2. 处理prev log不一致的情况,返回包中告诉Leader自己目前的log情况
3. 处理添加日志成功的情况-- 保存新日志并更新current_idx和commit_idx
*/
int raft_recv_appendentries(
    raft_server_t* me_,
    raft_node_t* node,
    msg_appendentries_t* ae,
    msg_appendentries_response_t *r
    )
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    me->timeout_elapsed = 0;

    if (0 < ae->n_entries)
        __log(me_, node, "recvd appendentries from: %lx, t:%d ci:%d lc:%d pli:%d plt:%d #%d",
              node,
              ae->term,
              raft_get_current_idx(me_),
              ae->leader_commit,
              ae->prev_log_idx,
              ae->prev_log_term,
              ae->n_entries);

    r->term = me->current_term;

    /* 处理任期号 */
    /* currentTerm == ae->term,当自己是Candidate时收到term与自己相等的请求,
     * 说明已经有其它Candidate成为了Leader,自己无条件变成Follower*/
    if (raft_is_candidate(me_) && me->current_term == ae->term)
    {
        me->voted_for = -1;
        raft_become_follower(me_);
    }
    /* currentTerm < ae->term. 自己的任期号已经落后Leader,无条件成为Follower,并且更新自己的term*/
    else if (me->current_term < ae->term)
    {
        raft_set_current_term(me_, ae->term);
        r->term = ae->term;
        raft_become_follower(me_);
    }
    /* currentTerm > ae->term. 说明收到一个过时Leader的请求,直接回包告诉它最新的term */
    else if (ae->term < me->current_term)
    {
        /* 1. Reply false if term < currentTerm (§5.1) */
        __log(me_, node, "AE term %d is less than current term %d",
              ae->term, me->current_term);
        goto fail_with_current_idx;
    }


    /* NOTE: the log starts at 1 */
    /* 检查请求中prev_log_idx日志的term与本地对应索引的term是否一致 */
    if (0 < ae->prev_log_idx)
    {
        raft_entry_t* e = raft_get_entry_from_idx(me_, ae->prev_log_idx);

        /*  本地在prev_log_idx位置还不存在日志,说明日志已经落后Leader了,返回false
         *    并告诉leader自己当前日志的位置,这样Leader知道下一次该发哪条日志过来了*/
        if (!e)
        {
            __log(me_, node, "AE no log at prev_idx %d", ae->prev_log_idx);
            goto fail_with_current_idx;
        }
        if (raft_get_current_idx(me_) < ae->prev_log_idx)
            goto fail_with_current_idx;

        /* 本地在prev_log_idx位置的日志的term与请求中的prev_log_term不一致,
         * 此时本地无条件删除本地与请求不一致的日志,并向Leader返回删除后的日志位置*/
        if (e->term != ae->prev_log_term)
        {
            __log(me_, node, "AE term doesn&#39;t match prev_term (ie. %d vs %d) ci:%d pli:%d",
                  e->term, ae->prev_log_term, raft_get_current_idx(me_), ae->prev_log_idx);
            assert(me->commit_idx < ae->prev_log_idx);
            /* Delete all the following log entries because they don&#39;t match */
            log_delete(me->log, ae->prev_log_idx);
            r->current_idx = ae->prev_log_idx - 1;
            goto fail;
        }
    }

    /* 本地的日志比Leader要多。当本地服务器曾经是Leader,收到了很多客户端请求
     * 并还没来得及同步时会出现这种情况。这时本地无条件删除比Leader多的日志 */
    if (ae->n_entries == 0 && 0 < ae->prev_log_idx && ae->prev_log_idx + 1 < raft_get_current_idx(me_))
    {
        assert(me->commit_idx < ae->prev_log_idx + 1);
        log_delete(me->log, ae->prev_log_idx + 1);
    }

    r->current_idx = ae->prev_log_idx;

    /* 下面for循环跳过请求中已经在本地添加过的日志*/
    int i;
    for (i = 0; i < ae->n_entries; i++)
    {
        msg_entry_t* ety = &ae->entries[i];
        int ety_index = ae->prev_log_idx + 1 + i;
        raft_entry_t* existing_ety = raft_get_entry_from_idx(me_, ety_index);
        r->current_idx = ety_index;
        if (existing_ety && existing_ety->term != ety->term)
        {
            assert(me->commit_idx < ety_index);
            log_delete(me->log, ety_index);
            break;
        }
        else if (!existing_ety)
            break;
    }

    /* 下面for循环将请求中确认的新日志添加到本地 */
    for (; i < ae->n_entries; i++)
    {
        int e = raft_append_entry(me_, &ae->entries[i]);
        if (-1 == e)
            goto fail_with_current_idx;

        r->current_idx = ae->prev_log_idx + 1 + i;
    }

    /* 4. 请求中携带了Leader已经提交到状态机的日志索引,本地同样也更新这个索引,将其
     *    设置为本地最大日志索引和leader_commit中的较小者*/
    if (raft_get_commit_idx(me_) < ae->leader_commit)
    {
        int last_log_idx = max(raft_get_current_idx(me_), 1);
        raft_set_commit_idx(me_, min(last_log_idx, ae->leader_commit));
    }

    /* 更新Leader信息 */
    me->current_leader = node;

    r->success = 1;
    r->first_idx = ae->prev_log_idx + 1;
    return 0;

fail_with_current_idx:
    r->current_idx = raft_get_current_idx(me_);
fail:
    r->success = 0;
    r->first_idx = 0;
    return -1;
}
Copy after login

处理添加日志请求回复 Leader收到添加日志回复后,可以知道下面这些信息:

自己是不是已经过时(current_term < response->term即为过时)

follower是否成功添加日志,如果添加失败,则减小发给follower的日志索引nextIndex再重试;如果添加成功则更新本地记录的follower日志信息,并检查日志是否最新,如果不是最新则继续发送添加日志请求。

新机器的日志添加,详见3.4节-- 成员变更

/** 处理添加日志请求回复
  * /
int raft_recv_appendentries_response(raft_server_t* me_,
                                     raft_node_t* node,
                                     msg_appendentries_response_t* r)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    __log(me_, node,
          "received appendentries response %s ci:%d rci:%d 1stidx:%d",
          r->success == 1 ? "SUCCESS" : "fail",
          raft_get_current_idx(me_),
          r->current_idx,
          r->first_idx);

    /* 过时的回复 -- 忽略 */
    if (r->current_idx != 0 && r->current_idx <= raft_node_get_match_idx(node))
        return 0;

    /* oh~我不是Leader */
    if (!raft_is_leader(me_))
        return -1;

    /* 回复中的term比自己的要大,说明自己是一个过时的Leader,无条件转为Follower */
    if (me->current_term < r->term)
    {
        raft_set_current_term(me_, r->term);
        raft_become_follower(me_);
        return 0;
    }
    /* 过时的回复,网络状况不好时会出现 */
    else if (me->current_term != r->term)
        return 0;

    /* stop processing, this is a node we don&#39;t have in our configuration */
    if (!node)
        return 0;

    /* 由于日志不一致导致添加日志不成功*/
    if (0 == r->success)
    {
        assert(0 <= raft_node_get_next_idx(node));

        /* 将nextIdex减*/
        int next_idx = raft_node_get_next_idx(node);
        assert(0 <= next_idx);
        /* Follower的日志数量还远远少于Leader,将nextIdex设为回复中的current_idx+1和Leader
         * 当前索引中较小的一个,一般回复中的current_idx+1会比较小*/
        if (r->current_idx < next_idx - 1)
            raft_node_set_next_idx(node, min(r->current_idx + 1, raft_get_current_idx(me_)));
        /* Follower的日志数量和Leader差不多,但是比对前一条日志时失败,这种情况将next_idx减1
         * 重试*/
        else
            raft_node_set_next_idx(node, next_idx - 1);

        /* 使用更新后的nextIdx重新发送添加日志请求 */
        raft_send_appendentries(me_, node);
        return 0;
    }

    assert(r->current_idx <= raft_get_current_idx(me_));

    /* 下面处理添加日志请求的情况 */
    /* 更新本地记录的Follower的日志情况 */
    raft_node_set_next_idx(node, r->current_idx + 1);
    raft_node_set_match_idx(node, r->current_idx);

    /* 如果是新加入的机器,则判断它的日志是否是最新,如果达到了最新,则赋予它投票权,
     * 这里逻辑的详细解释在第3.4节 -- 成员变更*/
    if (!raft_node_is_voting(node) &&
        -1 == me->voting_cfg_change_log_idx &&
        raft_get_current_idx(me_) <= r->current_idx + 1 &&
        me->cb.node_has_sufficient_logs &&
        0 == raft_node_has_sufficient_logs(node)
        )
    {
        raft_node_set_has_sufficient_logs(node);
        me->cb.node_has_sufficient_logs(me_, me->udata, node);
    }

    /* 如果一条日志回复成功的数量超过一半,则将日志提交commit,即允许应用到状态机 */
    int votes = 1; /* include me */
    int point = r->current_idx;
    int i;
    for (i = 0; i < me->num_nodes; i++)
    {
        if (me->node == me->nodes[i] || !raft_node_is_voting(me->nodes[i]))
            continue;

        int match_idx = raft_node_get_match_idx(me->nodes[i]);

        if (0 < match_idx)
        {
            raft_entry_t* ety = raft_get_entry_from_idx(me_, match_idx);
            /*如果follower已经添加了索引大于等于r->current_idx的日志,则vote加1*/
            if (ety->term == me->current_term && point <= match_idx)
                votes++;
        }
    }

    /* 投票数大于所有服务器的一半,则将日志提交 */
    if (me->num_nodes / 2 < votes && raft_get_commit_idx(me_) < point)
        raft_set_commit_idx(me_, point);

    /* 如果follower的日志还没有最新,那么继续发送添加日志请求 */
    if (raft_get_entry_from_idx(me_, raft_node_get_next_idx(node)))
        raft_send_appendentries(me_, node);

    /* periodic applies committed entries lazily */

    return 0;
}
Copy after login

3.3 成员变更

成员的变更都是以日志的形式下发的。添加的新成员分两阶段进行,第一阶段中新成员没有有投票权,但是有接收日志的权力;当它的日志同步到最新后就进入到第二阶段,由Leader赋予投票权,从而成为集群中完整的一员。删除成员相对比较简单,所有服务器收到删除成员的日志后,立马将该成员的信息从本地抹除。

添加成员过程

  1. 管理员向Leader发送添加成员命令

  2. Leader添加一条 RAFT_LOGTYPE_ADD_NONVOTING_NODE日志,即添加没有投票权的服务器。该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息。

  3. 当新成员的日志同步到最新后,Leader添加一条 RAFT_LOGTYPE_ADD_NODE日志,即有投票权的服务器,同样地,该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息,以后的投票活动会将新成员考虑进去。

删除成员过程

  1. 管理员向Leader发送删除成员命令。

  2. Leader添加一条 RAFT_LOGTYPE_REMOVE_NODE 日志,并跟普通日志一样同步给其它服务器。收到该日志的服务器立即将被成员信息从本地删除。

感谢大家的阅读,希望大家收益多多。

推荐教程:《C语言

The above is the detailed content of Raft implementation in c language (with code). For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:itbbs.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template