In diesem Artikel wird ein Benutzerdienst simuliert und Redis als Datenspeicherserver verwendet.
Beinhaltet zwei Java Beans, Benutzer und Rechte
public class User { private long id; private String name; // 标签 private String label; // 收货地址经度 private Double deliveryAddressLon; // 收货地址维度 private Double deliveryAddressLat; // 最新签到日 private String lastSigninDay; // 积分 private Integer score; // 权益 private List<Rights> rights; ... } public class Rights { private Long id; private Long userId; private String name; ... }
Abhängigkeiten einführen
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
Redis-Konfiguration hinzufügen
spring.redis.host=192.168.56.102 spring.redis.port=6379 spring.redis.password= spring.redis.timeout=5000
SpringBoot-Startup
@SpringBootApplication public class UserServiceReactive { public static void main(String[] args) { new SpringApplicationBuilder( UserServiceReactive.class) .web(WebApplicationType.REACTIVE).run(args); } }
Nach dem Start der Anwendung generiert Spring automatisch ReactiveRedisTemplate (das zugrunde liegende Framework ist Lettuce).
ReactiveRedisTemplate ähnelt RedisTemplate, bietet jedoch eine asynchrone, reaktionsfähige Redis-Interaktionsmethode.
Ich möchte noch einmal betonen, dass reaktive Programmierung asynchron ist und den Thread nach dem Senden einer Redis-Anfrage nicht blockiert und der aktuelle Thread andere Aufgaben ausführen kann.
Nachdem die Redis-Antwortdaten zurückgegeben wurden, plant ReactiveRedisTemplate den Thread zur Verarbeitung der Antwortdaten.
Reaktive Programmierung kann asynchrone Aufrufe implementieren und asynchrone Ergebnisse auf elegante Weise verarbeiten, was ihre größte Bedeutung darstellt.
Die von ReactiveRedisTemplate verwendete Standardserialisierung ist die JDK-Serialisierung.
@Bean public RedisSerializationContext redisSerializationContext() { RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext(); builder.key(StringRedisSerializer.UTF_8); builder.value(RedisSerializer.json()); builder.hashKey(StringRedisSerializer.UTF_8); builder.hashValue(StringRedisSerializer.UTF_8); return builder.build(); } @Bean public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) { RedisSerializationContext serializationContext = redisSerializationContext(); ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext); return reactiveRedisTemplate; }
Die Methode builder.hashValue gibt die Serialisierungsmethode des Redis-Listenwerts an strings, daher ist es immer noch auf StringRedisSerializer.UTF_8 gesetzt.
ReactiveRedisTemplate unterstützt grundlegende Datentypen wie Redis-Strings, Hashes, Listen, Mengen, geordnete Mengen usw.
Dieser Artikel verwendet Hashes zum Speichern von Benutzerinformationen und Listen zum Speichern von Benutzerrechten. Dieser Artikel geht nicht auf die Verwendung anderer grundlegender Datentypen ein. Die
public Mono<Boolean> save(User user) { ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash(); Mono<Boolean> userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user)); if(user.getRights() != null) { ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList(); opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> { logger.info("add rights:{}", l); }); } return userRs; }
beanToMap-Methode ist für die Konvertierung der Benutzerklasse in eine Karte verantwortlich.
Die Redis HyperLogLog-Struktur kann die Anzahl verschiedener Elemente in einer Sammlung zählen.
Verwenden Sie HyperLogLog, um die Anzahl der Benutzer zu zählen, die sich täglich anmelden.
public Mono<Long> login(User user) { ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog(); return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId()); }
Redis BitMap (Bitmap) verwendet ein Bit, um den Wert oder Status eines Elements darzustellen. Da Bit die kleinste Computerspeichereinheit ist, spart die Verwendung als Speicher Platz.
Verwenden Sie BitMap, um aufzuzeichnen, ob der Benutzer diese Woche eingecheckt hat
public void addSignInFlag(long userId) { String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16); redisTemplate.opsForValue().setBit( key, userId & 0xffff , true) .subscribe(b -> logger.info("set:{},result:{}", key, b)); }
Die oberen 48 Bits der Benutzer-ID werden verwendet, um Benutzer in verschiedene Schlüssel zu unterteilen, und die niedrigen 16 Bits werden als Bitmap-Offset-Parameter-Offset verwendet.
Der Offset-Parameter muss größer oder gleich 0 und kleiner als 2^32 sein (Bit-Mapping ist auf 512 MB begrenzt).
Redis Geo kann geografische Standortinformationen speichern und den geografischen Standort berechnen.
Wenn Sie Lagerinformationen innerhalb eines bestimmten Bereichs finden möchten
public Flux getWarehouseInDist(User u, double dist) { ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo(); Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist); RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending(); return geo.radius("warehouse:address", circle, args); }
warehouse:address
In dieser Sammlung müssen Sie zuerst die Informationen zum geografischen Standort des Lagers speichern. warehouse:address
这个集合中需要先保存好仓库地理位置信息。
ReactiveGeoOperations#radius方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。
ReactiveRedisTemplate也可以执行Lua脚本。
下面通过Lua脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加1,如果用户今天已签到,则拒接操作。
public Flux<String> addScore(long userId) { DefaultRedisScript<String> script = new DefaultRedisScript<>(); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua"))); List<String> keys = new ArrayList<>(); keys.add(String.valueOf(userId)); keys.add(LocalDateTime.now().toString().substring(0, 10)); return redisTemplate.execute(script, keys); }
signin.lua内容如下
local score=redis.call('hget','user:'..KEYS[1],'score') local day=redis.call('hget','user:'..KEYS[1],'lastSigninDay') if(day==KEYS[2]) then return '0' else redis.call('hset','user:'..KEYS[1],'score', score+1,'lastSigninDay',KEYS[2]) return '1' end
Redis Stream 是 Redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。
Redis借鉴了kafka的设计,一个Stream内可以存在多个消费组,一个消费组内可以存在多个消费者。
如果一个消费组内某个消费者消费了Stream中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。
下面定义一个Stream消费者,负责处理接收到的权益数据
@Component public class RightsStreamConsumer implements ApplicationRunner, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class); @Autowired private RedisConnectionFactory redisConnectionFactory; private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container; // Stream队列 private static final String STREAM_KEY = "stream:user:rights"; // 消费组 private static final String STREAM_GROUP = "user-service"; // 消费者 private static final String STREAM_CONSUMER = "consumer-1"; @Autowired @Qualifier("reactiveRedisTemplate") private ReactiveRedisTemplate redisTemplate; public void run(ApplicationArguments args) throws Exception { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(100) //一批次拉取的最大count数 .executor(Executors.newSingleThreadExecutor()) //线程池 .pollTimeout(Duration.ZERO) //阻塞式轮询 .targetType(Rights.class) //目标类型(消息内容的类型) .build(); // 创建一个消息监听容器 container = StreamMessageListenerContainer.create(redisConnectionFactory, options); // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP) .subscribe(stream -> { // 为Stream创建一个消费者,并绑定处理类 container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER), StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()), new StreamMessageListener()); container.start(); }); } @Override public void destroy() throws Exception { container.stop(); } // 查找Stream信息,如果不存在,则创建Stream private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) { // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。 return ops.info(stream).onErrorResume(err -> { logger.warn("query stream err:{}", err.getMessage()); // createGroup方法创建Stream return ops.createGroup(stream, group).flatMap(s -> ops.info(stream)); }); } // 消息处理对象 class StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> { public void onMessage(ObjectRecord<String, Rights> message) { // 处理消息 RecordId id = message.getId(); Rights rights = message.getValue(); logger.info("receive id:{},rights:{}", id, rights); redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> { logger.info("add rights:{}", l); }); } } }
下面看一下如何发送信息
public Mono<RecordId> addRights(Rights r) { String streamKey = "stream:user:rights";//stream key ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r); Mono<RecordId> mono = redisTemplate.opsForStream().add(record); return mono; }
创建一个消息记录对象ObjectRecord,并通过ReactiveStreamOperations发送信息记录。
ReactiveRedisTemplate也支持Redis Sentinel、Cluster集群模式,只需要调整配置即可。
Sentinel配置如下
spring.redis.sentinel.master=mymaster spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379 spring.redis.sentinel.password=
spring.redis.sentinel.nodes
配置的是Sentinel节点IP地址和端口,不是Redis实例节点IP地址和端口。
Cluster配置如下
spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379 spring.redis.lettuce.cluster.refresh.period=10000 spring.redis.lettuce.cluster.refresh.adaptive=true
如Redis Cluster中node2是node1的从节点,Lettuce中会缓存该信息,当node1宕机后,Redis Cluster会将node2升级为主节点。但Lettuce不会自动将请求切换到node2,因为它的缓冲没有刷新。
开启spring.redis.lettuce.cluster.refresh.adaptive
Die ReactiveGeoOperations#radius-Methode kann Elemente in der Sammlung finden, deren geografischer Standort innerhalb eines bestimmten Bereichs liegt. Sie unterstützt auch Vorgänge wie das Hinzufügen von Elementen zur Sammlung und die Berechnung der geografischen Entfernung zwischen zwei Elementen in der Sammlung.
spring.redis.sentinel.nodes
konfiguriert die IP-Adresse und den Port des Sentinel-Knotens, nicht die IP-Adresse und den Port des Redis-Instanzknotens. 🎜🎜Cluster-Konfiguration ist wie folgt🎜rrreee🎜Zum Beispiel ist Knoten2 im Redis-Cluster der Slave-Knoten von Knoten1, und Lettuce speichert diese Informationen zwischen. Wenn Knoten1 ausfällt, aktualisiert Redis-Cluster Knoten2 zum Master-Knoten. Aber Lettuce leitet die Anfrage nicht automatisch an Knoten2 weiter, da sein Puffer nicht geleert wird. 🎜Aktivieren Sie die spring.redis.lettuce.cluster.refresh.adaptive
-Konfiguration, um die Cache-Informationen des Redis-Clusters regelmäßig zu aktualisieren, den Knotenstatus des Clients dynamisch zu ändern und ein Failover durchzuführen. 🎜🎜Derzeit gibt es keine Lösung für ReactiveRedisTemplate zur Implementierung von Pipelines und Transaktionen. 🎜Das obige ist der detaillierte Inhalt vonSo implementieren Sie eine reaktionsfähige Redis-Interaktion im Frühjahr. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!