Artikel ini akan mensimulasikan perkhidmatan pengguna dan menggunakan Redis sebagai pelayan storan data.
Melibatkan dua kacang java, pengguna dan hak
public class User { private long id; private String name; // 标签 private String label; // 收货地址经度 private Double deliveryAddressLon; // 收货地址维度 private Double deliveryAddressLat; // 最新签到日 private String lastSigninDay; // 积分 private Integer score; // 权益 private List<Rights> rights; ... } public class Rights { private Long id; private Long userId; private String name; ... }
Perkenalkan kebergantungan
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
Tambah konfigurasi Redis
spring.redis.host=192.168.56.102 spring.redis.port=6379 spring.redis.password= spring.redis.timeout=5000
permulaan SpringBoot
SpringBoot startup
Selepas aplikasi dimulakan, Spring akan menjana ReactiveRedisTemplate secara automatik (rangka kerja asasnya ialah Lettuce).
ReactiveRedisTemplate adalah serupa dengan RedisTemplate, tetapi ia menyediakan kaedah interaksi Redis yang tidak segerak dan responsif.
Saya ingin menekankan sekali lagi bahawa pengaturcaraan reaktif adalah tidak segerak ReactiveRedisTemplate tidak akan menyekat utas selepas menghantar permintaan Redis, dan utas semasa boleh melaksanakan tugas lain.
Selepas data respons Redis dikembalikan, ReactiveRedisTemplate menjadualkan urutan untuk memproses data respons.
Artikel ini menggunakan cincang untuk menyimpan maklumat dan senarai pengguna untuk menyimpan hak pengguna Artikel ini tidak mengembangkan penggunaan jenis data asas yang lain.
@SpringBootApplication public class UserServiceReactive { public static void main(String[] args) { new SpringApplicationBuilder( UserServiceReactive.class) .web(WebApplicationType.REACTIVE).run(args); } }
Gunakan HyperLogLog untuk mengira bilangan pengguna log masuk setiap hari
@Bean public RedisSerializationContext redisSerializationContext() { RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext(); builder.key(StringRedisSerializer.UTF_8); builder.value(RedisSerializer.json()); builder.hashKey(StringRedisSerializer.UTF_8); builder.hashValue(StringRedisSerializer.UTF_8); return builder.build(); } @Bean public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) { RedisSerializationContext serializationContext = redisSerializationContext(); ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext); return reactiveRedisTemplate; }
Gunakan BitMap untuk merekod sama ada pengguna telah mendaftar masuk minggu ini
public Mono<Boolean> save(User user) { ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash(); Mono<Boolean> userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user)); if(user.getRights() != null) { ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList(); opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> { logger.info("add rights:{}", l); }); } return userRs; }
Parameter ofset mestilah lebih besar daripada atau sama dengan 0 dan kurang daripada 2^32 (pemetaan bit terhad kepada 512 MB).
Jika anda ingin mencari maklumat gudang dalam julat tertentu
public Mono<Long> login(User user) { ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog(); return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId()); }
Dalam koleksi ini, anda perlu menyimpan maklumat lokasi gudang terlebih dahulu. warehouse:address
Kaedah ReactiveGeoOperations#radius boleh mencari elemen dalam koleksi yang lokasi geografinya berada dalam julat tertentu Ia juga menyokong operasi seperti menambahkan elemen pada koleksi dan mengira jarak geografi antara dua elemen dalam koleksi.
Logik daftar masuk pengguna dilengkapkan melalui skrip Lua di bawah: jika pengguna belum mendaftar masuk hari ini, daftar masuk dibenarkan dan mata dinaikkan sebanyak 1. Jika pengguna telah mendaftar masuk hari ini, operasi ditolak.
public void addSignInFlag(long userId) { String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16); redisTemplate.opsForValue().setBit( key, userId & 0xffff , true) .subscribe(b -> logger.info("set:{},result:{}", key, b)); }
public Flux getWarehouseInDist(User u, double dist) { ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo(); Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist); RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending(); return geo.radius("warehouse:address", circle, args); }
Jika pengguna dalam kumpulan pengguna menggunakan mesej dalam Strim, mesej itu tidak akan digunakan oleh pengguna lain dalam kumpulan pengguna Sudah tentu, ia juga boleh digunakan oleh pengguna dalam kumpulan pengguna lain.
public Flux<String> addScore(long userId) { DefaultRedisScript<String> script = new DefaultRedisScript<>(); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua"))); List<String> keys = new ArrayList<>(); keys.add(String.valueOf(userId)); keys.add(LocalDateTime.now().toString().substring(0, 10)); return redisTemplate.execute(script, keys); }
local score=redis.call('hget','user:'..KEYS[1],'score') local day=redis.call('hget','user:'..KEYS[1],'lastSigninDay') if(day==KEYS[2]) then return '0' else redis.call('hset','user:'..KEYS[1],'score', score+1,'lastSigninDay',KEYS[2]) return '1' end
Konfigurasi Sentinel adalah seperti berikut
@Component public class RightsStreamConsumer implements ApplicationRunner, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class); @Autowired private RedisConnectionFactory redisConnectionFactory; private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container; // Stream队列 private static final String STREAM_KEY = "stream:user:rights"; // 消费组 private static final String STREAM_GROUP = "user-service"; // 消费者 private static final String STREAM_CONSUMER = "consumer-1"; @Autowired @Qualifier("reactiveRedisTemplate") private ReactiveRedisTemplate redisTemplate; public void run(ApplicationArguments args) throws Exception { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(100) //一批次拉取的最大count数 .executor(Executors.newSingleThreadExecutor()) //线程池 .pollTimeout(Duration.ZERO) //阻塞式轮询 .targetType(Rights.class) //目标类型(消息内容的类型) .build(); // 创建一个消息监听容器 container = StreamMessageListenerContainer.create(redisConnectionFactory, options); // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP) .subscribe(stream -> { // 为Stream创建一个消费者,并绑定处理类 container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER), StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()), new StreamMessageListener()); container.start(); }); } @Override public void destroy() throws Exception { container.stop(); } // 查找Stream信息,如果不存在,则创建Stream private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) { // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。 return ops.info(stream).onErrorResume(err -> { logger.warn("query stream err:{}", err.getMessage()); // createGroup方法创建Stream return ops.createGroup(stream, group).flatMap(s -> ops.info(stream)); }); } // 消息处理对象 class StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> { public void onMessage(ObjectRecord<String, Rights> message) { // 处理消息 RecordId id = message.getId(); Rights rights = message.getValue(); logger.info("receive id:{},rights:{}", id, rights); redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> { logger.info("add rights:{}", l); }); } } }
Konfigurasi ialah alamat dan port IP nod Sentinel, bukan alamat dan port IP nod contoh Redis. spring.redis.sentinel.nodes
public Mono<RecordId> addRights(Rights r) { String streamKey = "stream:user:rights";//stream key ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r); Mono<RecordId> mono = redisTemplate.opsForStream().add(record); return mono; }
Dayakan konfigurasi
dan Lettuce boleh menyegarkan maklumat cache gugusan Redis Cluster secara kerap, menukar status nod pelanggan secara dinamik dan melengkapkan failover. spring.redis.lettuce.cluster.refresh.adaptive
Atas ialah kandungan terperinci Bagaimana untuk melaksanakan interaksi Redis responsif dalam Spring. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!