Redis におけるコマンドのアトミック性の詳細な分析

WBOY
リリース: 2022-06-01 11:58:04
転載
3857 人が閲覧しました

この記事では、Redis に関する関連知識を提供します。主に、同時処理ソリューション、プログラミング モデル、マルチ IO など、アトミック操作におけるコマンドのアトミック性に関連する問題を紹介します。スレッドやコマンド単体の関連内容ですので、皆様のお役に立てれば幸いです。

Redis におけるコマンドのアトミック性の詳細な分析

#推奨される学習:

Redis ビデオ チュートリアル

Redis が同時アクセスに対処する方法

Redis で同時実行性を処理するためのスキーム

ビジネスでは、フラッシュ セール ビジネスや在庫操作など、同時性の高いビジネス シナリオを処理するために Redis を使用することがあります。 。 。

まず、同時実行シナリオでどのような問題が発生するかを分析しましょう

同時実行の問題は主にデータ変更で発生します。クライアントがデータを変更する場合、通常は次の 2 つの手順に分かれます:

1. クライアントは最初にデータをローカルで読み取り、ローカルで変更します;

2. クライアントはデータを変更した後、それを Redis に書き込みます。

このプロセスを

Read-Modify-Writeback 操作 (Read-Modify-Write、RMW 操作と呼ばれます) と呼びます。クライアントが RMW 操作を同時に実行する場合、read-modify-writeback がアトミック操作であることを確認する必要があります。コマンド操作を実行する場合、他のクライアントは現在のデータを操作できません。

Wrong Chestnut:

ページへの訪問数をカウントします。ページが更新されるたびに訪問数は 1 になります。ここでは、Redis を使用して訪問数を記録します。

read-modify-writeback 操作がアトミック操作ではない場合、次の図に示すような問題が発生する可能性があります。クライアント 2 も、次の操作の途中です。クライアント 1. Redis の値を取得し、その値に対して 1 の操作も実行します。これにより、最終データにエラーが発生します。

上記の状況の場合、通常、解決するには 2 つの方法があります:

1. Redis を使用して分散ロックを実装し、そのロックを使用して重要なリソースを一度に 1 つのスレッドだけが操作できるようにします。

2. 操作コマンドの原子性を実現します。

    Li Ru、上記のエラー例の場合、
  • Read-Modify-Writeback がアトミック コマンドである場合、このコマンドは操作中に何の違いもありません。運用データも同時に取得できるため、上記の問題を回避できます。
以下は、アトミック性とロックの 2 つの側面からの同時アクセスの問題の処理の詳細な分析です。

アトミック性

同時実行制御に必要なクリティカル セクション コードの相互排他的実行を実装するには、Redis でコマンドのアトミック性を使用する場合、次の 2 つの処理方法があります:

1. アトミック性を利用して単一のコマンドを使用するRedis;

2. 複数の操作を Lua スクリプトに記述し、単一の Lua スクリプトをアトミックに実行します。

Redis の原子性について議論するときは、まず Redis で使用されるプログラミング モデルについて説明しましょう

Redis のプログラミング モデル

Redis で使用されるプログラミング モデルReactor モデル、Reactor はノンブロッキング I/O モデルです。Unix の I/O モデルを見てみましょう。

Unix の I/O モデル

オペレーティング システム上の I/O は、ユーザー空間とカーネル空間の間のデータ対話であるため、通常、I/O 操作には次のものが含まれます。次の 2 つの手順:

1. ネットワーク データがネットワーク カードに到着するまで待機 (準備完了)/ネットワーク カードが書き込み可能になるまで待機 (書き込み準備完了) –> カーネル バッファへの読み取り/書き込み。

2 、カーネル バッファからデータをコピー –> ユーザー空間 (読み取り)/ユーザー空間からデータをコピー –> カーネル バッファ (書き込み);

5 つの基本的な I/O モデルがありますUnix の場合

    ブロッキング I/O;
  • ノンブロッキング I/O;
  • I/O 多重化;
  • シグナル駆動 I/ O;
  • 非同期 I/O;

I/O モデルが同期か非同期かを判断するには、主に 2 番目のステップに依存します。 : データ ユーザー空間とカーネル空間の間のコピーは現在のプロセスをブロックしますか? ブロックされる場合は同期 I/O、そうでない場合は非同期 I/O です。

ここでは主に次の 3 つの I/O モデルに分かれています。

    ブロッキング I/O;
ユーザー プログラムが read を実行すると、スレッドはBlockとなり、カーネルデータが準備できるまで待ち、カーネルバッファからアプリケーションバッファへデータをコピーし、コピー処理が完了するとreadに戻ります。

ブロックされて待たされるのは、「カーネルデータの準備が完了する」と「カーネルモードからユーザーモードにデータがコピーされる」の2つの処理です。

  • ノンブロッキング同期 I/O;

データの準備ができていない場合、ノンブロッキング読み取りリクエストはすぐに返され、実行を続行できます。このとき、アプリケーションはデータの準備ができるまでカーネルにクエリを実行し、カーネルはデータをアプリケーション バッファにコピーし、読み取り呼び出しで結果を取得できるようになります。

ここでの最後の読み取り呼び出し、つまりデータを取得するプロセスは同期プロセスであり、待機する必要があります。ここでの同期とは、カーネル状態データをユーザープログラムのキャッシュ領域にコピーする処理を指します。

  • ノンブロッキング非同期 I/O;

非同期 I/O を開始し、すぐに戻ります。カーネルは、データをカーネル空間からユーザー空間に自動的にコピーします。このコピー プロセスは、また、これは非同期であり、カーネルによって自動的に完了するため、前の同期操作とは異なり、アプリケーションがコピー アクションを積極的に開始する必要はありません。

たとえば、食堂に食事に行く場合、あなたはアプリケーションのようなもので、食堂はオペレーティング システムのようなものです。

I/O のブロック たとえば、食堂に食事に行きましたが、食堂の食べ物はまだ準備ができておらず、そこでずっと待っていました。食堂のおばさんが持ってくるまで待ちます。食べ物は運ばれてきますが (データ準備プロセス)、おばさんが食べ物 (カーネル空間) を弁当箱 (ユーザー空間) に入れるのをまだ待たなければなりません。これら 2 つの後、プロセスを終了しても構いません。

ノンブロッキング I/O たとえば、食堂に行って、おばに食べ物の準備ができているかどうか尋ねると、彼女は食べ物の準備ができているかどうかを教えてくれたので、あなたはその場を去りました。もう一度食堂に来て聞いてみると、おばちゃんは準備ができたと言うので、お弁当箱に食べ物を入れるのを手伝ってくれます。このプロセスを待たなければなりません。

非同期 I/O たとえば、食堂のおばちゃんに料理の準備をお願いして、お弁当箱に詰めて、お弁当を届けてもらいます。プロセス。

Web サービスでは、通常、Web リクエストを処理するための 2 つのアーキテクチャがあります。つまり、スレッドベースのアーキテクチャ (スレッドベースのアーキテクチャ)、イベント駆動型のアーキテクチャ(イベント駆動型モデル)

スレッドベース アーキテクチャ(スレッドベース アーキテクチャ)

スレッドベース アーキテクチャ(スレッドベース アーキテクチャ): これは理解しやすいです。サーバーがリクエストを処理するとき、各リクエストには処理用の独立したスレッドが割り当てられます。

各リクエストには独立したスレッドが割り当てられるため、単一のスレッドがブロックされても他のスレッドには影響せず、プログラムの応答速度が向上します。

欠点は、接続とスレッドの間に常に 1 対 1 の関係があることです。接続が常にキープアライブ状態にある長い接続の場合、大量のワーカー スレッドが発生します。ファイル システム アクセスやネットワークなどのアイドル状態で待機すること。さらに、数百または数千の接続により、同時スレッドが大量のメモリ スタック スペースを浪費する可能性があります。

イベント駆動型アーキテクチャ (イベント駆動型モデル)

イベント駆動型アーキテクチャは、イベント プロデューサーとイベント コンシューマーで構成され、疎結合の分散駆動型アーキテクチャです。では、プロデューサーはアプリケーションによって生成されたイベントを収集し、システムの応答を待たずにダウンストリーム システムにルーティングする前に、イベントに必要な処理をリアルタイムで実行します。ダウンストリームのイベント コンシューマー グループは、イベント メッセージを受信して​​非同期に処理します。

イベント駆動型アーキテクチャには次の利点があります:

  • 結合の削減;

イベント プロデューサーとサブスクライバの結合を削減します。イベントプロデューサーは、イベントの発生に注意を払うだけでよく、イベントがどのように処理され、どのサブスクライバに配布されるかに注意を払う必要はありません。リンクに障害が発生しても、他のビジネスの通常の動作には影響しません。

  • 非同期実行;

イベント駆動型アーキテクチャは、非同期シナリオに適しています。需要のピーク期間中であっても、さまざまなソースからのイベントが収集され、イベント バスに保持されます。システムの輻輳や過剰なリソースを発生させずに、段階的に配信イベントを分散します。

  • スケーラビリティ;

イベント駆動型アーキテクチャのルーティングおよびフィルタリング機能は、サービスの分割をサポートし、拡張とルート配布を容易にします。

Reactor モードと Proactor モードは、どちらも イベント駆動型アーキテクチャ (イベント駆動型モデル) の実装方法です。

Reactor モードの詳細な分析は次のとおりです。

リアクター モードは、1 つ以上の入力を通じてサービス プロセッサに同時に渡されるサービス リクエストのイベント駆動型処理モードを指します。

ネットワーク IO 接続イベント、読み取りイベント、書き込みイベントの処理。 Reactor

  • reactor では 3 種類のロールが導入されています: イベントの監視と割り当て、接続イベントはアクセプターに渡され、読み取りおよび書き込みイベントはハンドラーに渡されます。
  • acceptor : 接続リクエストを受信します。接続を受信した後、ネットワーク接続上で後続の読み取りおよび書き込みイベントを処理するハンドラーが作成されます。
  • handler: 読み取りおよび書き込みイベントを処理します。

リアクター モデルは 3 つのカテゴリに分類されます:

  • シングルスレッド Reactor モード;

接続を確立し (アクセプター)、受け入れ、読み取り、書き込みイベントを監視し (リアクター)、プロセス イベント (ハンドラー) ) は単一のスレッドのみを使用します。

  • マルチスレッド Reactor モード;

シングル スレッドとは異なります。スレッド モード、ワーカー スレッド プールが追加され、非 I/O 操作が Reactor スレッドからワーカー スレッド プール (スレッド プール) に転送されて実行されます。

接続 (アクセプター) を確立し、イベントの受け入れ、読み取り、書き込み (リアクター) をリッスンし、スレッドを再利用します。

ワーカー スレッド プール: イベントを処理します (ハンドラー)。ワーカー スレッド プールは、データの準備ができた後のユーザー モード データの読み取りと書き込みを含むビジネス ロジックを実行します。

  • マスター/スレーブ Reactor モード;

複数の CPU を搭載したマシンの場合、システム リソースを最大限に活用するために、 Reactor は mainReactor と subReactor の 2 つの部分に分割されます。

mainReactor: サーバーソケットの監視を担当し、新しいネットワーク接続の確立を処理し、確立されたsocketChannelをsubReactorに登録するために使用されます。通常は1つのスレッドで処理できます。

subReactor: accept、read、write イベント (Reactor) を監視します。これには、データの準備ができるのを待機するときのカーネル モードでのデータの読み取りと書き込みが含まれます (通常はマルチスレッドを使用します)。

ワーカー スレッド: イベント処理 (Handler) は、subReactor と同じスレッドを使用することも、マルチスレッド Reactor のワーカー スレッド プールの処理方法と同様に、スレッド プールにすることもできます。上記のモード。

Proactor モード

reactor プロセスは Reactor モードと似ています

違いは、

  • Reactor は非-ブロッキング同期ネットワーク モードでは、読み取りおよび書き込み準備完了イベントが認識されます。

イベント (読み取り可能な Ready イベントなど) が検出されるたびに、アプリケーション プロセスは read メソッドをアクティブに呼び出してデータの読み取りを完了する必要があります。つまり、アプリケーション プロセスはアクティブにデータを受け取る必要があります。ソケット。キャッシュ内のデータはアプリケーション プロセスのメモリに読み込まれます。このプロセスは同期です。アプリケーション プロセスは、データを読み取った後でのみデータを処理できます。

  • Proactor は非同期ネットワーク モードで、完了した読み取りおよび書き込みイベントを認識します。

非同期の読み取りおよび書き込みリクエストを開始するときは、システム カーネルが自動的にサポートできるように、データ バッファー (結果データの保存に使用される) のアドレスとその他の情報を渡す必要があります。データの読み取りおよび書き込み作業を完了します。ここでの読み取りおよび書き込み作業全体はオペレーティング システムによって行われます。データの読み取りおよび書き込みのために Reactor のようにアプリケーション プロセスが積極的に read/write を開始する必要はありません。オペレーティング システムが読み取りおよび書き込み作業を完了すると、アプリケーション プロセスのプロセス データが直接通知されます。

したがって、Reactor は「イベントが発生すると、オペレーティング システムがアプリケーション プロセスに通知し、アプリケーション プロセスにそれを処理させる」と理解できます。一方、Proactor は、「イベントが発生すると、オペレーティング システムがアプリケーション プロセスにそれを処理させる」と理解できます。システムがそれを処理し、処理後にアプリケーション プロセスに通知します。」

実際の例を挙げると、Reactor モデルは、配達員が階下にいて、配達員があなたのコミュニティに到着したことを知らせる電話をしてくるということを意味します。配達員を取りに行くには自分で階下に行く必要があります。プロアクター モードでは、宅配業者が荷物を直接あなたの玄関まで届けてから、あなたに通知します。

Redis がシングル スレッドを選択する理由

Redis はシングル スレッドを使用しますが、これは次の考慮事項による可能性があります

1. Redis は純粋なメモリです。操作の速度は非常に速いため、操作のこの部分は通常パフォーマンスのボトルネックではありません。パフォーマンスのボトルネックはネットワーク I/O にあります。

2. 過度のコンテキスト切り替えオーバーヘッドを回避します。単一スレッドは、プロセス 頻繁なスレッド切り替えのオーバーヘッド;

3. 同期メカニズムのオーバーヘッドを回避します. マルチスレッドでは共有リソースへのアクセスが避けられません. このとき、通常のアプローチはロックです. マルチスレッドですが、シリアルアクセスとなります。つまり、マルチスレッド プログラミング モードが直面する共有リソースの同時アクセス制御の問題です。

4. シンプルで保守可能です。マルチスレッドでは、共有リソースへの同時アクセスを保護し、維持するための同期プリミティブも導入されます。コードの保守性と可読性が低下します。

Redis v6.0 バージョンより前の Redis のコア ネットワーク モデルは、常に典型的な単一 Reactor モデルでした。epoll/select/kqueue およびその他の多重化テクノロジを単一スレッドで使用していました。イベント (クライアント要求) はイベント ループ内で継続的に処理され、最後に応答データがクライアントに書き戻されます。

ここでは、Redis が単一スレッドを使用してタスクを処理する方法を示します

##イベント駆動型フレームワークはイベントをキャプチャして配布します

Redis のネットワーク フレームワークは Reactor モデルを実装し、イベント駆動型フレームワークが開発および実装されますそれ自体で。

イベント駆動型フレームワークのロジックは単純に次のとおりです。
  • イベントの初期化;
  • イベント キャプチャ;
  • 配布および処理のメイン ループ。

Redis のイベント駆動型フレームワークによって実装されたいくつかの主な関数を見てみましょう

// 执行事件捕获,分发和处理循环
void aeMain(aeEventLoop *eventLoop);
// 用来注册监听的事件和事件对应的处理函数。只有对事件和处理函数进行了注册,才能在事件发生时调用相应的函数进行处理。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData);
// aeProcessEvents 函数实现的主要功能,包括捕获事件、判断事件类型和调用具体的事件处理函数,从而实现事件的处理
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
ログイン後にコピー

aeMain をメイン ループとして使用して、継続的に実行します。イベントとキャプチャを監視します。イベント キャプチャを実現するために aeProcessEvents 関数が呼び出され、イベント タイプを決定し、特定のイベント処理関数を呼び出してイベント処理を実現します。

// https://github.com/redis/redis/blob/5.0/src/ae.c#L496
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

// https://github.com/redis/redis/blob/5.0/src/ae.c#L358
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
       ...
   if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
       ...
       //调用aeApiPoll函数捕获事件
       numevents = aeApiPoll(eventLoop, tvp);
       ...
    }
    ...
}
ログイン後にコピー

aeApiPoll を呼び出すことで、aeProcessEvents での IO イベントのキャプチャが完了していることがわかります。

aeApiPoll は I/O 多重化 API です。epoll_wait/select/kevent などのシステム コールに基づくカプセル化です。トリガーされる読み取りおよび書き込みイベントをリッスンして待機します。これはイベント ループであり、(イベント ループ) の中核となる機能は、イベント駆動型の動作の基礎となります。

Redis は、基盤となるオペレーティング システムによって提供される IO 多重化メカニズムに依存して、イベント キャプチャを実装し、新しい接続、読み取りおよび書き込みイベントが発生するかどうかを確認します。さまざまなオペレーティング システムに適応するために、Redis はさまざまなオペレーティング システムによって実装されるネットワーク IO 多重化機能を均一にカプセル化します。

// https://github.com/redis/redis/blob/5.0/src/ae.c#L49
#ifdef HAVE_EVPORT
#include "ae_evport.c"  // Solaris
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"   // Linux
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"  // MacOS
        #else
        #include "ae_select.c"  // Windows
        #endif
    #endif
#endif
ログイン後にコピー

ae_epoll.c: Linux の IO 多重化機能 epoll に対応;

ae_evport.c: Solaris の IO 多重化機能 evport に対応;

ae_kqueue.c : macOS または FreeBSD の IO 多重化機能 kqueue に対応;

ae_select.c: Linux (または Windows) の IO 多重化機能 select に対応。

クライアント接続応答

ソケットの読み取りイベントをリッスンし、クライアント接続要求が来たら、関数 acceptTcpHandler を使用してクライアントとの接続を確立します

Redis が起動されると、サーバー プログラムの main 関数は初期化のために initSever 関数を呼び出します。初期化プロセス中に、initServer 関数によって aeCreateFileEvent が呼び出され、監視対象のイベントと対応するイベント処理関数が登録されます。 。

// https://github.com/redis/redis/blob/5.0/src/server.c#L2036
void initServer(void) {
   ...
    // 创建一个事件处理程序以接受 TCP 和 Unix 中的新连接
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
  ...
}
ログイン後にコピー

initServer が、有効な IP ポートの数に基づいて各 IP ポートのネットワーク イベントに対して aeCreateFileEvent を呼び出し、AE_READABLE イベントのリスナーを作成し、AE_READABLE イベントのハンドラーを登録することがわかります。は acceptTcpHandler 関数です。

次に、acceptTcpHandler の実装を見てみましょう

// https://github.com/redis/redis/blob/5.0/src/networking.c#L734
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
       // 用于accept客户端的连接,其返回值是客户端对应的socket
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        // 会调用acceptCommonHandler对连接以及客户端进行初始化
        acceptCommonHandler(cfd,0,cip);
    }
}

// https://github.com/redis/redis/blob/5.0/src/networking.c#L664  
static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    // 分配并初始化新客户端
    if ((c = createClient(fd)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
    // 判断当前连接的客户端是否超过最大值,如果超过的话,会拒绝这次连接。否则,更新客户端连接数的计数
    if (listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";

        /* That's a best effort error message, don't check write errors */
        if (write(c->fd,err,strlen(err)) == -1) {
            /* Nothing to do, Just to avoid the warning... */
        }
        server.stat_rejected_conn++;
        freeClient(c);
        return;
    }
    ...
}

// 使用多路复用,需要记录每个客户端的状态,client 之前通过链表保存
typedef struct client {
int fd; // 字段是客户端套接字文件描述符
sds querybuf; // 保存客户端发来命令请求的输入缓冲区。以Redis通信协议的方式保存
int argc; // 当前命令的参数数量
robj **argv;  // 当前命令的参数
redisDb *db; // 当前选择的数据库指针
int flags;
list *reply; // 保存命令回复的链表。因为静态缓冲区大小固定,主要保存固定长度的命令回复,当处理一些返回大量回复的命令,则会将命令回复以链表的形式连接起来。
// ... many other fields ...
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));
    
    // 如果fd为-1,表示创建的是一个无网络连接的伪客户端,用于执行lua脚本的时候。
    // 如果fd不等于-1,表示创建一个有网络连接的客户端
    if (fd != -1) {
        // 设置fd为非阻塞模式
        anetNonBlock(NULL,fd);
        // 禁止使用 Nagle 算法,client向内核递交的每个数据包都会立即发送给server出去,TCP_NODELAY
        anetEnableTcpNoDelay(NULL,fd);
        // 如果开启了tcpkeepalive,则设置 SO_KEEPALIVE
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
         // 创建一个文件事件状态el,且监听读事件,开始接受命令的输入
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    ...
    // 初始化client 中的参数
    return c;
}
ログイン後にコピー

1. AcceptTcpHandler は主にクライアントとの接続の確立を処理するために使用されます;

2. 関数 anetTcpAcceptクライアントを受け入れるために呼び出されます。戻り値はクライアントに対応するソケットです。

3. 次に、acceptCommonHandler を呼び出して、接続とクライアントを初期化します。

4. クライアントを初期化するときは、次を使用します。 aeCreateFileEvent を同時に実行します。監視対象のイベントとイベントに対応する処理関数を登録し、readQueryFromClient コマンド読み取りプロセッサを新しい接続に対応するファイル記述子にバインドします。

5. サーバーは読み取りを監視します。ファイル記述子のイベント。クライアントがコマンドを送信して AE_READABLE イベントをトリガーすると、コールバック関数 readQueryFromClient() が呼び出され、ファイル記述子 fd からコマンドが読み取られ、入力バッファ querybuf に保存されます。

コマンドの受信

readQueryFromClient は、リクエスト処理の開始点であり、クライアントのリクエスト コマンドを解析して実行します。

// https://github.com/redis/redis/blob/5.0/src/networking.c#L1522
// 读取client的输入缓冲区的内容
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);
    ...
    // 输入缓冲区的长度
    qblen = sdslen(c->querybuf);
    // 更新缓冲区的峰值
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    // 扩展缓冲区的大小
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    // 调用read从描述符为fd的客户端socket中读取数据
    nread = read(fd, c->querybuf+qblen, readlen);
    ...
    // 处理读取的内容
    processInputBufferAndReplicate(c);
}
// https://github.com/redis/redis/blob/5.0/src/networking.c#L1507
void processInputBufferAndReplicate(client *c) {
    // 当前客户端不属于主从复制中的Master 
    // 直接调用 processInputBuffer,对客户端输入缓冲区中的命令和参数进行解析
    if (!(c->flags & CLIENT_MASTER)) {
        processInputBuffer(c);
    // 客户端属于主从复制中的Master 
    // 调用processInputBuffer函数,解析客户端命令,  
    // 调用replicationFeedSlavesFromMasterStream 函数,将主节点接收到的命令同步给从节点
    } else {
        size_t prev_offset = c->reploff;
        processInputBuffer(c);
        size_t applied = c->reploff - prev_offset;
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}

// https://github.com/redis/redis/blob/5.0/src/networking.c#L1428
void processInputBuffer(client *c) {
    server.current_client = c;

    /* Keep processing while there is something in the input buffer */
    // 持续读取缓冲区的内容
    while(c->qb_pos < sdslen(c->querybuf)) {
        ...
        /* Multibulk processing could see a <= 0 length. */
        // 如果参数为0,则重置client
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            // 执行命令成功后重置client
            if (processCommand(c) == C_OK) {
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                    /* Update the applied replication offset of our master. */
                    c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
                }

                // 命令处于阻塞状态中的客户端,不需要进行重置
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
            /* freeMemoryIfNeeded may flush slave output buffers. This may
             * result into a slave, that may be the active client, to be
             * freed. */
            if (server.current_client == NULL) break;
        }
    }

    /* Trim to pos */
    if (server.current_client != NULL && c->qb_pos) {
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }

    server.current_client = NULL;
}
ログイン後にコピー

1. readQueryFromClient()、ファイル記述子 fd から入力バッファ querybuf にデータを読み取ります;

2. processInputBuffer 関数を使用してコマンドの解析を完了し、次のコマンドで processInlineBuffer を使用します。または、processMultibulkBuffer が Redis プロトコルに従ってコマンドを解析します;

3. コマンドの解析が完了したら、processCommand を使用してコマンドを実行します;

4. コマンドの実行が完了したら、最後に addReply 関数ファミリーのメンバーを呼び出します。一連の関数は、クライアントに対応する書き込みバッファー (client->buf または client->reply) に応答データを書き込みます。client->buf は、優先される書き込みバッファーです。 16KB の固定サイズ。一般的に言えば、バッファリングできます。十分な応答データがありますが、クライアントが時間枠内で非常に大きなデータに応答する必要がある場合は、自動的にクライアント -> 応答リンク リストに切り替わります。リンク リストは理論的には無限に大きなデータ (マシンの物理メモリによって制限される) を保存でき、最終的にクライアントを LIFO キューに追加します client_pending_write;

コマンド応答

Before Redis イベントドリブン フレームワークは、ループするたびにイベント処理関数に入ります。監視対象のトリガー イベントまたは時間指定イベントを処理する前に、beforeSleep 関数が呼び出されて、何らかのタスク処理を実行します。これには、handleClientsWithPendingWrites 関数の呼び出しが含まれます。 Redis サーバー クライアント バッファ ゾーン内のデータはクライアントに書き戻されます。

// https://github.com/redis/redis/blob/5.0/src/server.c#L1380
void beforeSleep(struct aeEventLoop *eventLoop) {
    UNUSED(eventLoop);

    ...
    // 将 Redis sever 客户端缓冲区中的数据写回客户端
    handleClientsWithPendingWrites();
    ...
}

// https://github.com/redis/redis/blob/5.0/src/networking.c#L1082
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    // 遍历 clients_pending_write 队列,调用 writeToClient 把 client 的写出缓冲区里的数据回写到客户端
    int processed = listLength(server.clients_pending_write);

    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);
        ...
        // 调用 writeToClient 函数,将客户端输出缓冲区中的数据写回
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

         // 如果输出缓冲区的数据还没有写完,此时,handleClientsWithPendingWrites 函数就
        // 会调用 aeCreateFileEvent 函数,创建可写事件,并设置回调函数 sendReplyToClien
        if (clientHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
            {
                ae_flags |= AE_BARRIER;
            }
            // 将文件描述符fd和AE_WRITABLE事件关联起来,当客户端可写时,就会触发事件,调用sendReplyToClient()函数,执行写事件
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
            {
                    freeClientAsync(c);
            }
        }
    }
    return processed;
}

// https://github.com/redis/redis/blob/5.0/src/networking.c#L1072
// 写事件处理程序,只是发送回复给client
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    UNUSED(el);
    UNUSED(mask);
    writeToClient(fd,privdata,1);
}

// https://github.com/redis/redis/blob/5.0/src/networking.c#L979
// 将输出缓冲区的数据写给client,如果client被释放则返回C_ERR,没被释放则返回C_OK
int writeToClient(int fd, client *c, int handler_installed) {
    ssize_t nwritten = 0, totwritten = 0;
    size_t objlen;
    clientReplyBlock *o;
    
    // 如果指定的client的回复缓冲区中还有数据,则返回真,表示可以写socket
    while(clientHasPendingReplies(c)) {
        // 固定缓冲区发送未完成
        if (c->bufpos > 0) {
            // 将缓冲区的数据写到fd中
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            ...
            // 如果发送的数据等于buf的偏移量,表示发送完成
            if ((int)c->sentlen == c->bufpos) {
                c->bufpos = 0;
                c->sentlen = 0;
            }
        // 固定缓冲区发送完成,发送回复链表的内容
        } else {
            // 回复链表的第一条回复对象,和对象值的长度和所占的内存
            o = listNodeValue(listFirst(c->reply));
            objlen = o->used;

            if (objlen == 0) {
                c->reply_bytes -= o->size;
                listDelNode(c->reply,listFirst(c->reply));
                continue;
            }
            // 将当前节点的值写到fd中
            nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;
            ...
        }
        ...
    }
    ...
    // 如果指定的client的回复缓冲区中已经没有数据,发送完成
    if (!clientHasPendingReplies(c)) {
        c->sentlen = 0;
        // 删除当前client的可读事件的监听
        if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);

        /* Close connection after entire reply has been sent. */
        // 如果指定了写入按成之后立即关闭的标志,则释放client
        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
            freeClient(c);
            return C_ERR;
        }
    }
    return C_OK;
}
ログイン後にコピー
1. beforeSleep 関数によって呼び出された handleClientsWithPendingWrites 関数は、clients_pending_write (データを書き戻されるクライアント) キューを走査し、writeToClient を呼び出してクライアントの書き込みバッファ内のデータをクライアントに書き込み、その後呼び出します。 writeToClient 関数は、クライアント出力バッファ内のデータをクライアントに送信します。

2、如果输出缓冲区的数据还没有写完,此时,handleClientsWithPendingWrites 函数就会调用 aeCreateFileEvent 函数,注册 sendReplyToClient 到该连接的写就绪事件,等待将后续将数据写回给客户端。

上面的执行流程总结下来就是

1、Redis Server 启动后,主线程会启动一个时间循环(Event Loop),持续监听事件;

2、client 到 server 的新连接,会调用 acceptTcpHandler 函数,之后会注册读事件 readQueryFromClient 函数,client 发给 server 的数据,都会在这个函数处理,这个函数会解析 client 的数据,找到对应的 cmd 函数执行;

3、cmd 逻辑执行完成后,server 需要写回数据给 client,调用 addReply 函数族的一系列函数将响应数据写入到对应 client 的写出缓冲区:client->buf 或者 client->reply ,client->buf 是首选的写出缓冲区,固定大小 16KB,一般来说可以缓冲足够多的响应数据,但是如果客户端在时间窗口内需要响应的数据非常大,那么则会自动切换到 client->reply 链表上去,使用链表理论上能够保存无限大的数据(受限于机器的物理内存),最后把 client 添加进一个 LIFO 队列 clients_pending_write

4、在 Redis 事件驱动框架每次循环进入事件处理函数前,来处理监听到的已触发事件或是到时的时间事件之前,都会调用 beforeSleep 函数,进行一些任务处理,这其中就包括了调用 handleClientsWithPendingWrites 函数,它会将 Redis sever 客户端缓冲区中的数据写回客户端;

  • beforeSleep 函数调用的 handleClientsWithPendingWrites 函数,会遍历 clients_pending_write(待写回数据的客户端) 队列,调用 writeToClient 把 client 的写出缓冲区里的数据回写到客户端,然后调用 writeToClient 函数,将客户端输出缓冲区中的数据发送给客户端;
  • 如果输出缓冲区的数据还没有写完,此时,handleClientsWithPendingWrites 函数就会调用 aeCreateFileEvent 函数,注册 sendReplyToClient 到该连接的写就绪事件,等待将后续将数据写回给客户端。

Redis 多IO线程

在 Redis6.0 的版本中,引入了多线程来处理 IO 任务,多线程的引入,充分利用了当前服务器多核特性,使用多核运行多线程,让多线程帮助加速数据读取、命令解析以及数据写回的速度,提升 Redis 整体性能。

Redis6.0 之前的版本用的是单线程 Reactor 模式,所有的操作都在一个线程中完成,6.0 之后的版本使用了主从 Reactor 模式。

由一个 mainReactor 线程接收连接,然后发送给多个 subReactor 线程处理,subReactor 负责处理具体的业务。

来看下 Redis 多IO线程的具体实现过程

多 IO 线程的初始化

使用 initThreadedIO 函数来初始化多 IO 线程。

// https://github.com/redis/redis/blob/6.2/src/networking.c#L3573
void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */

    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
     // 如果用户只配置了一个 I/O 线程,不需要创建新线程了,直接在主线程中处理
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    /* Spawn and initialize the I/O threads. */
    // 初始化线程
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        // 编号为0是主线程
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        // 初始化io_threads_mutex数组
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        // 初始化io_threads_pending数组
        setIOPendingCount(i, 0);
        // 主线程在启动 I/O 线程的时候会默认先锁住它,直到有 I/O 任务才唤醒它。
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        // 调用pthread_create函数创建IO线程,线程运行函数为IOThreadMain
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can&#39;t initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}
ログイン後にコピー

可以看到在 initThreadedIO 中完成了对下面四个数组的初始化工作

io_threads_list 数组:保存了每个 IO 线程要处理的客户端,将数组每个元素初始化为一个 List 类型的列表;

io_threads_pending 数组:保存等待每个 IO 线程处理的客户端个数;

io_threads_mutex 数组:保存线程互斥锁;

io_threads 数组:保存每个 IO 线程的描述符。

命令的接收

Redis server 在和一个客户端建立连接后,就开始了监听客户端的可读事件,处理可读事件的回调函数就是 readQueryFromClient

// https://github.com/redis/redis/blob/6.2/src/networking.c#L2219
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;

    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    // 判断是否从客户端延迟读取数据
    if (postponeClientRead(c)) return;
    ...
}

// https://github.com/redis/redis/blob/6.2/src/networking.c#L3746
int postponeClientRead(client *c) {
    // 当多线程 I/O 模式开启、主线程没有在处理阻塞任务时,将 client 加入异步队列。
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) 
    {
        // 给客户端的flag添加CLIENT_PENDING_READ标记,表示推迟该客户端的读操作
        c->flags |= CLIENT_PENDING_READ;
        // 将可获得加入clients_pending_write列表
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}
ログイン後にコピー

使用 clients_pending_read 保存了需要进行延迟读操作的客户端之后,这些客户端又是如何分配给多 IO 线程执行的呢?

handleClientsWithPendingWritesUsingThreads 函数:该函数主要负责将 clients_pending_write 列表中的客户端分配给 IO 线程进行处理。

看下如何实现

// https://github.com/redis/redis/blob/6.2/src/networking.c#L3766
int handleClientsWithPendingReadsUsingThreads(void) {
    // 当多线程 I/O 模式开启,才能执行下面的流程
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    // 遍历待读取的 client 队列 clients_pending_read,
    // 根据IO线程的数量,让clients_pending_read中客户端数量对IO线程进行取模运算
    // 取模的结果就是客户端分配给对应IO线程的编号
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 设置当前 I/O 操作为读取操作,给每个 I/O 线程的计数器设置分配的任务数量,
    // 让 I/O 线程可以开始工作:只读取和解析命令,不执行
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    // 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    // 忙轮询,等待所有 IO 线程完成待读客户端的处理
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    // 遍历待读取的 client 队列,清除 CLIENT_PENDING_READ标记,
    // 然后解析并执行所有 client 的命令。
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);

        serverAssert(!(c->flags & CLIENT_BLOCKED));
        // client 的第一条命令已经被解析好了,直接尝试执行。
        if (processPendingCommandsAndResetClient(c) == C_ERR) {
            /* If the client is no longer valid, we avoid
             * processing the client later. So we just go
             * to the next. */
            continue;
        }

        // 解析并执行 client 命令
        processInputBuffer(c);

        // 命令执行完成之后,如果 client 中有响应数据需要回写到客户端,则将 client 加入到待写出队列 clients_pending_write
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }

    /* Update processed count on server */
    server.stat_io_reads_processed += processed;

    return processed;
}
ログイン後にコピー

1、当客户端发送命令请求之后,会触发 Redis 主线程的事件循环,命令处理器 readQueryFromClient 被回调,多线程模式下,则会把 client 加入到 clients_pending_read 任务队列中去,后面主线程再分配到 I/O 线程去读取客户端请求命令;

2、主线程会根据 clients_pending_read 中客户端数量对IO线程进行取模运算,取模的结果就是客户端分配给对应IO线程的编号;

3、忙轮询,等待所有的线程完成读取客户端命令的操作,这一步用到了多线程的请求;

4、遍历 clients_pending_read,执行所有 client 的命令,这里就是在主线程中执行的,命令的执行是单线程的操作。

命令的回复

完成命令的读取、解析以及执行之后,客户端命令的响应数据已经存入 client->buf 或者 client->reply 中。

主循环在捕获 IO 事件的时候,beforeSleep 函数会被调用,进而调用 handleClientsWithPendingWritesUsingThreads ,写回响应数据给客户端。

// https://github.com/redis/redis/blob/6.2/src/networking.c#L3662
int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    // 如果用户设置的 I/O 线程数等于 1 或者当前 clients_pending_write 队列中待写出的 client
    // 数量不足 I/O 线程数的两倍,则不用多线程的逻辑,让所有 I/O 线程进入休眠,
    // 直接在主线程把所有 client 的相应数据回写到客户端。
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    // 唤醒正在休眠的 I/O 线程(如果有的话)。
    if (!server.io_threads_active) startThreadedIO();

    /* Distribute the clients across N different lists. */
    // 和上面的handleClientsWithPendingReadsUsingThreads中的操作一样分配客户端给IO线程
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }

        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 设置当前 I/O 操作为写出操作,给每个 I/O 线程的计数器设置分配的任务数量,
    // 让 I/O 线程可以开始工作,把写出缓冲区(client->buf 或 c->reply)中的响应数据回写到客户端。
    // 可以看到写回操作也是多线程执行的
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    // 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    // 等待所有的线程完成对应的工作
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    // 最后再遍历一次 clients_pending_write 队列,检查是否还有 client 的写出缓冲区中有残留数据,
    // 如果有,那就为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        // 检查 client 的写出缓冲区是否还有遗留数据。
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;
}
ログイン後にコピー

1、也是会将 client 分配给所有的 IO 线程;

2、忙轮询,等待所有的线程将缓存中的数据写回给客户端,这里写回操作使用的多线程;

3、最后再遍历 clients_pending_write,为那些还残留有响应数据的 client 注册命令回复处理器 sendReplyToClient,等待客户端可写之后在事件循环中继续回写残余的响应数据。

通过上面的分析可以得出结论,Redis 多IO线程中多线程的应用

1、解析客户端的命令的时候用到了多线程,但是对于客户端命令的执行,使用的还是单线程;

2、给客户端回复数据的时候,使用到了多线程。

来总结下 Redis 中多线程的执行过程

1、Redis Server 启动后,主线程会启动一个时间循环(Event Loop),持续监听事件;

2、client 到 server 的新连接,会调用 acceptTcpHandler 函数,之后会注册读事件 readQueryFromClient 函数,client 发给 server 的数据,都会在这个函数处理;

3、客户端发送给服务端的数据,不会类似 6.0 之前的版本使用 socket 直接去读,而是会将 client 放入到 clients_pending_read 中,里面保存了需要进行延迟读操作的客户端;

4、处理 clients_pending_read 的函数 handleClientsWithPendingReadsUsingThreads,在每次事件循环的时候都会调用;

  • 1、主线程会根据 clients_pending_read 中客户端数量对IO线程进行取模运算,取模的结果就是客户端分配给对应IO线程的编号;
  • 2、忙轮询,等待所有的线程完成读取客户端命令的操作,这一步用到了多线程的请求;
  • 3、遍历 clients_pending_read,执行所有 client 的命令,这里就是在主线程中执行的,命令的执行是单线程的操作。

5、命令执行完成以后,回复的内容还是会被写入到 client 的缓存区中,这些 client 和6.0之前的版本处理方式一样,也是会被放入到 clients_pending_write(待写回数据的客户端);

6、6.0 对于clients_pending_write 的处理使用到了多线程;

  • 1、也是会将 client 分配给所有的 IO 线程;
  • 2、忙轮询,等待所有的线程将缓存中的数据写回给客户端,这里写回操作使用的多线程;
  • 3、最后再遍历 clients_pending_write,为那些还残留有响应数据的 client 注册命令回复处理器 sendReplyToClient,等待客户端可写之后在事件循环中继续回写残余的响应数据。

原子性的单命令

通过上面的分析,我们知道,Redis 的主线程是单线程执行的,所有 Redis 中的单命令,都是原子性的。

所以对于一些场景的操作尽量去使用 Redis 中单命令去完成,就能保证命令执行的原子性。

比如对于上面的读取-修改-写回操作可以使用 Redis 中的原子计数器, INCRBY(自增)、DECRBR(自减)、INCR(加1) 和 DECR(减1) 等命令。

这些命令可以直接帮助我们处理并发控制

127.0.0.1:6379> incr test-1
(integer) 1
127.0.0.1:6379> incr test-1
(integer) 2
127.0.0.1:6379> incr test-1
(integer) 3
ログイン後にコピー

分析下源码,看看这个命令是如何实现的

// https://github.com/redis/redis/blob/6.2/src/t_string.c#L617

void incrCommand(client *c) {
    incrDecrCommand(c,1);
}

void decrCommand(client *c) {
    incrDecrCommand(c,-1);
}

void incrbyCommand(client *c) {
    long long incr;

    if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != C_OK) return;
    incrDecrCommand(c,incr);
}

void decrbyCommand(client *c) {
    long long incr;

    if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != C_OK) return;
    incrDecrCommand(c,-incr);
}
ログイン後にコピー

可以看到 INCRBY(自增)、DECRBR(自减)、INCR(加1) 和 DECR(减1)这几个命令最终都是调用的 incrDecrCommand

// https://github.com/redis/redis/blob/6.2/src/t_string.c#L579  
void incrDecrCommand(client *c, long long incr) {
    long long value, oldvalue;
    robj *o, *new;

    // 查找有没有对应的键值
    o = lookupKeyWrite(c->db,c->argv[1]);
    // 判断类型,如果value对象不是字符串类型,直接返回
    if (checkType(c,o,OBJ_STRING)) return;

    // 将字符串类型的value转换为longlong类型保存在value中
    if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return;

    // 备份旧的value
    oldvalue = value;

    // 判断 incr 的值是否超过longlong类型所能表示的范围
    // 长度的范围,十进制 64 位有符号整数
    if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) ||
        (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {
        addReplyError(c,"increment or decrement would overflow");
        return;
    }
    // 计算新的 value值
    value += incr;

    if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT &&
        (value < 0 || value >= OBJ_SHARED_INTEGERS) &&
        value >= LONG_MIN && value <= LONG_MAX)
    {
        new = o;
        o->ptr = (void*)((long)value);
    } else {
        new = createStringObjectFromLongLongForValue(value);
        // 如果之前的 value 对象存在
        if (o) {
            // 重写为 new 的值  
            dbOverwrite(c->db,c->argv[1],new);
        } else {
            // 如果之前没有对应的 value,新设置 value 的值
            dbAdd(c->db,c->argv[1],new);
        }
    }
    // 进行通知
    signalModifiedKey(c,c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
    server.dirty++;
    addReply(c,shared.colon);
    addReply(c,new);
    addReply(c,shared.crlf);
}
ログイン後にコピー

总结

1、Redis 中的命令执行都是单线程的,所以单命令的执行都是原子性的;

2、虽然 Redis6.0 版本引入了多线程,但是仅是在接收客户端的命令和回复客户端的数据用到了多线程,实际命令的执行还是单线程在处理;

推荐学习:Redis视频教程

以上がRedis におけるコマンドのアトミック性の詳細な分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:csdn.net
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート
私たちについて 免責事項 Sitemap
PHP中国語ウェブサイト:福祉オンライン PHP トレーニング,PHP 学習者の迅速な成長を支援します!