Maison > base de données > Redis > Flux de types de données spéciaux Redis

Flux de types de données spéciaux Redis

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
Libérer: 2022-10-11 17:47:36
avant
3127 Les gens l'ont consulté

Cet article vous apporte des connaissances pertinentes sur Redis, qui présente principalement le contenu pertinent d'un flux de types de données spéciaux. Redis fournit une multitude de types de données, dont quatre spéciaux : bitmap, hyperloglog, géospatial, flux, jetons un coup d'œil. sur les problèmes liés au flux, j'espère que cela sera utile à tout le monde.

Flux de types de données spéciaux Redis

Apprentissage recommandé : Tutoriel vidéo Redis

Redis Stream est un type de données nouvellement ajouté dans la version Redis 5.0. Redis est un type de données spécialement conçu pour les files d'attente de messages.

Avant la sortie de Redis 5.0 Stream, la mise en œuvre des files d'attente de messages présentait toutes ses propres lacunes, telles que :

  • Le mode de publication et d'abonnement, qui ne peut pas être conservé et ne peut pas enregistrer les messages de manière fiable, et ne convient pas aux clients de reconnexion hors ligne. Le défaut de ne pas pouvoir lire les messages historiques ;

  • La manière dont List implémente la file d'attente des messages ne peut pas être consommée à plusieurs reprises. Un message sera supprimé une fois consommé, et le producteur doit implémenter lui-même un identifiant globalement unique.

Sur la base des problèmes ci-dessus, Redis 5.0 a lancé le type Stream, qui est également la fonctionnalité la plus importante de cette version. Il est utilisé pour implémenter parfaitement les files d'attente de messages, il prend en charge la persistance des messages, la génération automatique d'identifiants globalement uniques et. Mode de confirmation des messages, prend en charge le mode groupe de consommateurs, etc., rendant la file d'attente des messages plus stable et fiable.

Commandes communes

Commandes d'opération de file d'attente de messages de flux :

  • DEL : supprimer l'intégralité du flux
  • # XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
    127.0.0.1:6379> XADD s1 * name sid10t
    "1665047636078-0"
    127.0.0.1:6379> XADD s1 * name sidiot
    "1665047646214-0"
    # XDEL key id [id ...]
    127.0.0.1:6379> XDEL s1 1665047646214-0
    (integer) 1
    # DEL key [key ...]
    127.0.0.1:6379> DEL s1
    (integer) 1
    Copier après la connexion

  • XLEN : interroger le longueur du message ;

Flux de types de données spéciaux Redis

Intervalle message; rXtrim : nombre de messages de file d'attente de coupe ;
  • Rrieee

  • xgroup create : Créer un groupe de consommateurs ;

  • Xreadgroup : Lire le message sous la forme d'un groupe de consommateurs ; La commande peut être utilisée pour interroger les messages « lus, mais pas encore confirmés » de tous les consommateurs de chaque groupe de consommateurs ; La commande

    XACK est utilisée pour confirmer à la file d'attente des messages que le traitement des messages est terminé
  • # XLEN key
    127.0.0.1:6379> XLEN s1
    (integer) 2
    # XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
    127.0.0.1:6379> XREAD streams s1 0-0
    1) 1) "s1"
       2) 1) 1) "1665047636078-0"
             2) 1) "name"
                2) "sid10t"
          2) 1) "1665047646214-0"
             2) 1) "name"
                2) "sidiot"
    127.0.0.1:6379> XREAD count 1 streams s1 0-0
    1) 1) "s1"
       2) 1) 1) "1665047636078-0"
             2) 1) "name"
                2) "sid10t"
        # XADD 了一条消息之后的扩展
        127.0.0.1:6379> XREAD streams s1 1665047636078-0
        1) 1) "s1"
           2) 1) 1) "1665047646214-0"
                 2) 1) "name"
                    2) "sidiot"
              2) 1) "1665053702766-0"
                 2) 1) "age"
                    2) "18"
    # XRANGE key start end [COUNT count]
    127.0.0.1:6379> XRANGE s1 - +
    1) 1) "1665047636078-0"
       2) 1) "name"
          2) "sid10t"
    2) 1) "1665047646214-0"
       2) 1) "name"
          2) "sidiot"
    3) 1) "1665053702766-0"
       2) 1) "age"
          2) "18"
    127.0.0.1:6379> XRANGE s1 1665047636078-0 1665047646214-0
    1) 1) "1665047636078-0"
       2) 1) "name"
          2) "sid10t"
    2) 1) "1665047646214-0"
       2) 1) "name"
          2) "sidiot"
    # XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
    127.0.0.1:6379> XLEN s1
    (integer) 3
    127.0.0.1:6379> XTRIM s1 maxlen 2
    (integer) 1
    127.0.0.1:6379> XLEN s1
    (integer) 2
    Copier après la connexion
  • Scénario d'application ;

  • File d'attente des messages
  • Le producteur insère un message via la commande XADD :

    # XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]
    # 需要注意的是,XGROUP CREATE 的 streams 必须是一个存在的 streams,否则会报错;
    127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
    (error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
    # 0-0 从头开始消费,$ 从尾开始消费;
    127.0.0.1:6379> XADD myStream * name sid10t
    "1665057823181-0"
    127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
    OK
    127.0.0.1:6379> XGROUP CREATE myStream cGroup-tail $
    OK
    # XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
    127.0.0.1:6379> XREADGROUP Group cGroup-top name count 2 STREAMS myStream >
    1) 1) "myStream"
       2) 1) 1) "1665058086931-0"
             2) 1) "name"
                2) "sid10t"
          2) 1) "1665058090167-0"
             2) 1) "name"
                2) "sidiot"
    Copier après la connexion

    Après une insertion réussie, l'ID globalement unique : "1665058759764-0" sera renvoyé. L'ID globalement unique du message se compose de deux parties :
  • La première partie "1665058759764" est l'heure actuelle du serveur calculée en millisecondes lorsque les données sont insérées
  • La deuxième partie représente le message inséré dans la milliseconde actuelle ; Numéro de série, numéroté à partir de 0. Par exemple, « 1665058759764-0 » signifie le premier message dans les « 1665058759764 » millisecondes.

Lorsque les consommateurs lisent des messages de la file d'attente de messages via la commande XREAD, ils peuvent spécifier un ID de message et commencer la lecture à partir du message suivant de cet ID de message (notez que la lecture commence à partir du message suivant de l'ID de message d'entrée, et non un message interrogeant l'ID d'entrée).

# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID
# 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 sid10t
127.0.0.1:6379> XADD mymq * name sid10t
"1665058759764-0"
Copier après la connexion

Si vous souhaitez implémenter un blocage de lecture (blocage lorsqu'il n'y a pas de données), vous pouvez définir l'élément de configuration BLOCK lors de l'appel de XRAED pour implémenter une opération de blocage de lecture similaire à BRPOP.

Par exemple, la commande suivante définit l'élément de configuration du BLOC 10000. L'unité de 10000 est la milliseconde, ce qui indique que lorsque XREAD lit le dernier message, si aucun message n'arrive, XREAD bloquera pendant 10000 millisecondes (soit 10 secondes) et puis reviens. La méthode de base de

127.0.0.1:6379> XREAD STREAMS mymq 1665058759764-0
(nil)
127.0.0.1:6379> XREAD STREAMS mymq 1665058759763-0
1) 1) "mymq"
   2) 1) 1) "1665058759764-0"
         2) 1) "name"
            2) "sid10t"
Copier après la connexion
Stream utilise xadd pour stocker les messages et xread pour bloquer la lecture des messages en boucle afin d'implémenter une version simple de la file d'attente de messages. Le processus d'interaction est le suivant :

Ces opérations Liste introduite. précédents sont également pris en charge, jetons un coup d'œil aux fonctions uniques de Stream.

    Stream peut utiliser XGROUP pour créer un groupe de consommateurs. Après avoir créé un groupe de consommateurs, Stream peut utiliser la commande XREADGROUP pour permettre aux consommateurs du groupe de consommateurs de lire les messages.
  • Créez deux groupes de consommateurs. La file d'attente de messages consommée par ces deux groupes de consommateurs est mymq. Ils spécifient tous deux de commencer la lecture à partir du premier message :

    # 命令最后的 $ 符号表示读取最新的消息
    127.0.0.1:6379> XREAD BLOCK 10000 STREAMS mymq $
    (nil)
    (10.01s)
    Copier après la connexion
  • Consumer1 dans le groupe de consommateurs group1 lit tous les messages de la file d'attente de messages mymq. est la suivante :
  • # 创建一个名为 group1 的消费组,0-0 表示从第一条消息开始读取。
    127.0.0.1:6379> XGROUP CREATE mymq group1 0-0
    OK
    # 创建一个名为 group2 的消费组,0-0 表示从第一条消息开始读取。
    127.0.0.1:6379> XGROUP CREATE mymq group2 0-0
    OK
    Copier après la connexion

    Une fois que le message dans la file d'attente des messages est lu par un consommateur du groupe de consommateurs, il ne peut plus être lu par les autres consommateurs du groupe de consommateurs, c'est-à-dire que les consommateurs du même groupe de consommateurs ne peuvent pas consommer le même message.

    比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:

    127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
    (nil)
    Copier après la connexion

    但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息) 。

    比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1665058759764-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:

    127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 STREAMS mymq >
    1) 1) "mymq"
       2) 1) 1) "1665058759764-0"
             2) 1) "name"
                2) "sid10t"
    Copier après la connexion

    因为我创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1665058759764-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。

    使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

    例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

    # 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息
    127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq >
    1) 1) "mymq"
       2) 1) 1) "1665060632864-0"
             2) 1) "name"
                2) "sid10t"
                
    # 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息
    127.0.0.1:6379> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq >
    1) 1) "mymq"
       2) 1) 1) "1665060633903-0"
             2) 1) "name"
                2) "sid10t"
                
    # 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息
    127.0.0.1:6379> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq >
    1) 1) "mymq"
       2) 1) 1) "1665060634962-0"
             2) 1) "name"
                2) "sid10t"
    Copier après la connexion

    基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?

    Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams “消息已经处理完成”。

    消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:

    Flux de types de données spéciaux Redis

    如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

    例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:

    127.0.0.1:6379> XPENDING mymq group2
    1) (integer) 4
    2) "1665058759764-0"
    3) "1665060634962-0"
    4) 1) 1) "consumer1"
          2) "2"
       2) 1) "consumer2"
          2) "1"
       3) 1) "consumer3"
          2) "1"
    Copier après la connexion

    如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:

    # 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息
    127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
    1) 1) "1665060633903-0"
       2) "consumer2"
       3) (integer) 1888805
       4) (integer) 1
    Copier après la connexion

    可以看到,consumer2 已读取的消息的 ID 是 1665060633903-0。

    一旦消息 1665060633903-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。

    127.0.0.1:6379> XACK mymq group2 1665060633903-0
    (integer) 1
    Copier après la connexion

    当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

    127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
    (empty array)
    Copier après la connexion

    小结

    好了,基于 Stream 实现的消息队列就说到这里了,小结一下:

    • 消息保序:XADD/XREAD

    • 阻塞读取:XREAD block

    • 重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;

    • 消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;

    • 支持消费组形式消费数据

    Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?

    一个专业的消息队列,必须要做到两大块:

    • 消息不可丢。

    • 消息可堆积。

    1、Redis Stream 消息会丢失吗?

    使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。

    Flux de types de données spéciaux Redis

    Redis Stream 消息队列能不能保证三个环节都不丢失数据?

    • Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。 从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。

    • Redis 消费者会不会丢消息?不会,因为 Stream ( MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。

    • Redis 消息中间件会不会丢消息?会,Redis 在以下 2 个场景下,都会导致数据丢失:

    La persistance AOF est configurée pour écrire sur le disque toutes les secondes, mais ce processus d'écriture sur disque est asynchrone, et il existe une possibilité de perte de données lorsque Redis tombe en panne

    La réplication maître-esclave est également asynchrone, et il y a aussi ; perte lorsque le maître-esclave change. Possibilités de données (ouvre une nouvelle fenêtre).

    Comme vous pouvez le constater, Redis ne peut pas garantir que les messages ne seront pas perdus dans le lien middleware de la file d'attente. Un middleware de file d'attente professionnel tel que RabbitMQ ou Kafka est utilisé pour déployer un cluster. Lorsqu'un producteur publie un message, le middleware de file d'attente écrit généralement « plusieurs nœuds », c'est-à-dire qu'il y a plusieurs copies de cette manière, même si l'un des nœuds. échoue, les données du cluster ne seront pas perdues.

    2. Les messages Redis Stream peuvent-ils être accumulés ?

    Les données Redis sont stockées en mémoire, ce qui signifie qu'une fois qu'un retard de messages se produit, la mémoire Redis continuera de croître si elle dépasse la limite de mémoire de la machine, elle sera confrontée au risque de MOO.

    Donc Redis' Stream fournit la fonction de spécifier la longueur maximale de la file d'attente, histoire d'éviter cette situation.

    Lors de la spécification de la longueur maximale de la file d'attente, une fois que la longueur de la file d'attente dépasse la limite supérieure, les anciens messages seront supprimés et seuls les nouveaux messages de longueur fixe seront conservés. De ce point de vue, si Stream a une longueur maximale spécifiée lorsque les messages sont en retard, les messages peuvent toujours être perdus.

    Mais les données des files d'attente de messages professionnelles telles que Kafka et RabbitMQ sont stockées sur le disque. Lorsque les messages sont en retard, ils occupent simplement plus d'espace disque.

    Par conséquent, lorsque vous utilisez Redis comme file d'attente, vous serez confronté à deux problèmes :

    • Redis lui-même peut perdre des données

    • Face à la compression des messages, les ressources mémoire seront limitées ; La possibilité d'utiliser Redis comme file d'attente de messages dépend de votre scénario commercial :

    Si votre scénario commercial est assez simple, peu sensible à la perte de données et que la probabilité d'un retard de messages est relativement faible, utilisez Redis comme file d'attente de messages. Il est tout à fait possible de créer une file d'attente.

    • Si votre entreprise a une grande quantité de messages, la probabilité d'un retard de messages est relativement élevée et la perte de données ne peut pas être acceptée, il est alors préférable d'utiliser un middleware de file d'attente de messages professionnel.

    • Supplément : Pourquoi le mécanisme de publication/abonnement Redis ne peut-il pas être utilisé comme file d'attente de messages ?

    • Le mécanisme de publication-abonnement présente les inconvénients suivants, tous liés à la perte de données :

    Le mécanisme de publication/abonnement n'est implémenté sur la base d'aucun type de données, il n'a donc pas la capacité de "persistance des données", ce qui est lié au mécanisme de publication/abonnement. Les opérations ne seront pas écrites dans RDB et AOF. Lorsque Redis plante et redémarre, toutes les données du mécanisme de publication/abonnement seront perdues.

    • Le modèle de publication et d'abonnement est un mode de travail "envoyer et oublier". Si un abonné se déconnecte et se reconnecte, il ne peut pas consommer les messages historiques précédents.

    • Lorsque le consommateur a un certain retard de messages, c'est-à-dire les messages envoyés par le producteur, et que le consommateur ne peut pas les consommer, s'il dépasse 32M ou reste au-dessus de 8M dans les 60s, le consommateur sera déconnecté de force. Le paramètre est défini dans le fichier de configuration et la valeur par défaut est client-output-buffer-limit pubsub 32mb 8mb 60.

    • Par conséquent, le mécanisme de publication/abonnement ne convient qu'aux scénarios de communication instantanée. Par exemple, le scénario de création d'un cluster sentinelle (ouvre une nouvelle fenêtre) utilise le mécanisme de publication/abonnement.

    • Apprentissage recommandé :
    Tutoriel vidéo Redis

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:juejin.im
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal