Maison > base de données > Redis > Comment implémenter une interaction Redis réactive au printemps

Comment implémenter une interaction Redis réactive au printemps

PHPz
Libérer: 2023-05-27 17:49:47
avant
1254 Les gens l'ont consulté

Cet article simulera un service utilisateur et utilisera Redis comme serveur de stockage de données.
Implique deux beans Java, des utilisateurs et des droits

public class User {
    private long id;
    private String name;
    // 标签
    private String label;
    // 收货地址经度
    private Double deliveryAddressLon;
    // 收货地址维度
    private Double deliveryAddressLat;
    // 最新签到日
    private String lastSigninDay;
    // 积分
    private Integer score;
    // 权益
    private List<Rights> rights;
    ...
}

public class Rights {
    private Long id;
    private Long userId;
    private String name;
    ...
}
Copier après la connexion

Démarrage

Introduire des dépendances

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>
Copier après la connexion

Ajouter une configuration Redis

spring.redis.host=192.168.56.102
spring.redis.port=6379
spring.redis.password=
spring.redis.timeout=5000
Copier après la connexion

Démarrage de SpringBoot

@SpringBootApplication
public class UserServiceReactive {
    public static void main(String[] args) {
        new SpringApplicationBuilder(
                UserServiceReactive.class)
                .web(WebApplicationType.REACTIVE).run(args);
    }
}
Copier après la connexion

Après le démarrage de l'application, Spring générera automatiquement ReactiveRedisTemplate (son framework sous-jacent est Lettuce).
ReactiveRedisTemplate est similaire à RedisTemplate, mais il fournit une méthode d'interaction Redis asynchrone et réactive.
Je voudrais souligner à nouveau que la programmation réactive est asynchrone. ReactiveRedisTemplate ne bloquera pas le thread après l'envoi d'une requête Redis et le thread actuel peut effectuer d'autres tâches.
Une fois les données de réponse Redis renvoyées, ReactiveRedisTemplate planifie ensuite le thread pour traiter les données de réponse.
La programmation réactive peut implémenter des appels asynchrones et traiter les résultats asynchrones de manière élégante, ce qui est sa plus grande importance.

Sérialisation

La sérialisation par défaut utilisée par ReactiveRedisTemplate est la sérialisation Jdk. Nous pouvons la configurer comme méthode de sérialisation json

@Bean
public RedisSerializationContext redisSerializationContext() {
    RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
    builder.key(StringRedisSerializer.UTF_8);
    builder.value(RedisSerializer.json());
    builder.hashKey(StringRedisSerializer.UTF_8);
    builder.hashValue(StringRedisSerializer.UTF_8);

    return builder.build();
}

@Bean
public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
    RedisSerializationContext serializationContext = redisSerializationContext();
    ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);
    return reactiveRedisTemplate;
}
Copier après la connexion

builder.hashValue pour spécifier la méthode de sérialisation de la valeur de la liste Redis puisque la valeur de la liste Redis dans cet article ne stocke que les chaînes. , Il est donc toujours défini sur StringRedisSerializer.UTF_8.

Types de données de base

ReactiveRedisTemplate prend en charge les types de données de base tels que les chaînes Redis, les hachages, les listes, les ensembles, les ensembles ordonnés, etc.
Cet article utilise des hachages pour enregistrer les informations utilisateur et des listes pour enregistrer les droits des utilisateurs. Cet article ne développe pas l'utilisation d'autres types de données de base. La méthode

public Mono<Boolean>  save(User user) {
    ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();
    Mono<Boolean>  userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));
    if(user.getRights() != null) {
        ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();
        opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {
            logger.info("add rights:{}", l);
        });
    }
    return userRs;
}
Copier après la connexion

beanToMap est responsable de la conversion de la classe User en carte.

HyperLogLog

La structure Redis HyperLogLog peut compter le nombre d'éléments différents dans une collection.
Utilisez HyperLogLog pour compter le nombre d'utilisateurs se connectant chaque jour

public Mono<Long>  login(User user) {
    ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();
    return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());
}
Copier après la connexion

BitMap

Redis BitMap (bitmap) utilise un Bit pour représenter la valeur ou l'état correspondant à un élément. Étant donné que Bit est la plus petite unité de stockage informatique, son utilisation pour le stockage permettra d'économiser de l'espace.
Utilisez BitMap pour enregistrer si l'utilisateur s'est enregistré cette semaine

public void addSignInFlag(long userId) {
    String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);
    redisTemplate.opsForValue().setBit(
            key, userId & 0xffff , true)
    .subscribe(b -> logger.info("set:{},result:{}", key, b));
}
Copier après la connexion

Les 48 bits élevés de userId sont utilisés pour diviser les utilisateurs en différentes clés, et les 16 bits faibles sont utilisés comme décalage du paramètre de décalage bitmap.
Le paramètre offset doit être supérieur ou égal à 0 et inférieur à 2^32 (le bit mapping est limité à 512 Mo).

Geo

Redis Geo peut stocker des informations de localisation géographique et calculer la localisation géographique.
Si vous souhaitez trouver des informations sur l'entrepôt dans une plage donnée

public Flux getWarehouseInDist(User u, double dist) {
    ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();
    Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);
    RedisGeoCommands.GeoRadiusCommandArgs args =
            RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();
    return geo.radius("warehouse:address", circle, args);
}
Copier après la connexion

warehouse:addressDans cette collection, vous devez d'abord enregistrer les informations de localisation géographique de l'entrepôt. warehouse:address这个集合中需要先保存好仓库地理位置信息。
ReactiveGeoOperations#radius方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。

Lua

ReactiveRedisTemplate也可以执行Lua脚本。
下面通过Lua脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加1,如果用户今天已签到,则拒接操作。

public Flux<String> addScore(long userId) {
    DefaultRedisScript<String> script = new DefaultRedisScript<>();
    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));
    List<String> keys = new ArrayList<>();
    keys.add(String.valueOf(userId));
    keys.add(LocalDateTime.now().toString().substring(0, 10));
    return redisTemplate.execute(script, keys);
}
Copier après la connexion

signin.lua内容如下

local score=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;)
local day=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;lastSigninDay&#39;)
if(day==KEYS[2])
    then
    return &#39;0&#39;
else
    redis.call(&#39;hset&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;, score+1,&#39;lastSigninDay&#39;,KEYS[2])
    return &#39;1&#39;
end
Copier après la connexion

Stream

Redis Stream 是 Redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。

Redis借鉴了kafka的设计,一个Stream内可以存在多个消费组,一个消费组内可以存在多个消费者。
如果一个消费组内某个消费者消费了Stream中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。

下面定义一个Stream消费者,负责处理接收到的权益数据

@Component
public class RightsStreamConsumer implements ApplicationRunner, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container;
    // Stream队列
    private static final String STREAM_KEY = "stream:user:rights";
    // 消费组
    private static final String STREAM_GROUP = "user-service";
    // 消费者
    private static final String STREAM_CONSUMER = "consumer-1";

    @Autowired
    @Qualifier("reactiveRedisTemplate")
    private ReactiveRedisTemplate redisTemplate;

    public void run(ApplicationArguments args) throws Exception {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(100) //一批次拉取的最大count数
                        .executor(Executors.newSingleThreadExecutor())  //线程池
                        .pollTimeout(Duration.ZERO) //阻塞式轮询
                        .targetType(Rights.class) //目标类型(消息内容的类型)
                        .build();
        // 创建一个消息监听容器
        container = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream
        prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)
                .subscribe(stream -> {
            // 为Stream创建一个消费者,并绑定处理类
            container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                    StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
                    new StreamMessageListener());
            container.start();
        });
    }

    @Override
    public void destroy() throws Exception {
        container.stop();
    }

    // 查找Stream信息,如果不存在,则创建Stream
    private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) {
        // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。
        return ops.info(stream).onErrorResume(err -> {
            logger.warn("query stream err:{}", err.getMessage());
            // createGroup方法创建Stream
            return ops.createGroup(stream, group).flatMap(s -> ops.info(stream));
        });
    }

    // 消息处理对象
    class  StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> {
        public void onMessage(ObjectRecord<String, Rights> message) {
            // 处理消息
            RecordId id = message.getId();
            Rights rights = message.getValue();
            logger.info("receive id:{},rights:{}", id, rights);
            redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> {
                logger.info("add rights:{}", l);
            });
        }
    }
}
Copier après la connexion

下面看一下如何发送信息

public Mono<RecordId> addRights(Rights r) {
    String streamKey = "stream:user:rights";//stream key
    ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);
    Mono<RecordId> mono = redisTemplate.opsForStream().add(record);
    return mono;
}
Copier après la connexion

创建一个消息记录对象ObjectRecord,并通过ReactiveStreamOperations发送信息记录。

Sentinel、Cluster

ReactiveRedisTemplate也支持Redis Sentinel、Cluster集群模式,只需要调整配置即可。
Sentinel配置如下

spring.redis.sentinel.master=mymaster
spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379
spring.redis.sentinel.password=
Copier après la connexion

spring.redis.sentinel.nodes配置的是Sentinel节点IP地址和端口,不是Redis实例节点IP地址和端口。

Cluster配置如下

spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379
spring.redis.lettuce.cluster.refresh.period=10000
spring.redis.lettuce.cluster.refresh.adaptive=true
Copier après la connexion

如Redis Cluster中node2是node1的从节点,Lettuce中会缓存该信息,当node1宕机后,Redis Cluster会将node2升级为主节点。但Lettuce不会自动将请求切换到node2,因为它的缓冲没有刷新。
开启spring.redis.lettuce.cluster.refresh.adaptiveLa méthode ReactiveGeoOperations#radius peut trouver des éléments de la collection dont l'emplacement géographique se situe dans une plage donnée. Elle prend également en charge des opérations telles que l'ajout d'éléments à la collection et le calcul de la distance géographique entre deux éléments de la collection.

Lua

ReactiveRedisTemplate peut également exécuter des scripts Lua. 🎜La logique d'enregistrement de l'utilisateur est complétée via le script Lua ci-dessous : si l'utilisateur ne s'est pas enregistré aujourd'hui, l'enregistrement est autorisé et les points sont augmentés de 1. Si l'utilisateur s'est déjà enregistré aujourd'hui, l'opération est rejeté. Le contenu de 🎜rrreee🎜signin.lua est le suivant🎜rrreee

Stream🎜🎜Redis Stream est un type de données nouvellement ajouté dans la version Redis 5.0. Ce type peut implémenter des files d'attente de messages et fournir des fonctions de persistance des messages et de réplication de sauvegarde principale. Il peut mémoriser l'emplacement d'accès de chaque client et garantir que les messages ne sont pas perdus. 🎜🎜Redis s'appuie sur la conception de kafka. Plusieurs groupes de consommateurs peuvent exister dans un Stream, et plusieurs consommateurs peuvent exister dans un groupe de consommateurs. 🎜Si un consommateur d'un groupe de consommateurs consomme un message dans le Stream, le message ne sera pas consommé par les autres consommateurs du groupe de consommateurs. Bien entendu, il peut également être consommé par un consommateur d'autres groupes de consommateurs. 🎜🎜Ce qui suit définit un consommateur de flux, responsable du traitement des données sur les capitaux propres reçues🎜rrreee🎜Voyons comment envoyer des informations🎜rrreee🎜Créez un objet d'enregistrement de message ObjectRecord et envoyez l'enregistrement d'informations via ReactiveStreamOperations. 🎜

Sentinel, Cluster🎜🎜ReactiveRedisTemplate prend également en charge le mode cluster Redis Sentinel et Cluster, il vous suffit d'ajuster la configuration. 🎜La configuration de Sentinel est la suivante🎜rrreee🎜spring.redis.sentinel.nodes configure l'adresse IP et le port du nœud Sentinel, et non l'adresse IP et le port du nœud d'instance Redis. 🎜🎜La configuration du cluster est la suivante🎜rrreee🎜Par exemple, le nœud2 dans Redis Cluster est le nœud esclave du nœud1, et Lettuce mettra en cache ces informations lorsque le nœud1 tombe en panne, Redis Cluster mettra à niveau le nœud2 vers le nœud maître. Mais Lettuce ne basculera pas automatiquement la requête vers node2 car son tampon n'est pas vidé. 🎜Activez la configuration spring.redis.lettuce.cluster.refresh.adaptive. Lettuce peut régulièrement actualiser les informations du cache du cluster Redis, modifier dynamiquement l'état du nœud du client et effectuer le basculement. 🎜🎜Il n'existe actuellement aucune solution permettant à ReactiveRedisTemplate d'implémenter un pipeline et des transactions. 🎜

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:yisu.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal