Rumah > pangkalan data > Redis > Bagaimana untuk melaksanakan interaksi Redis responsif dalam Spring

Bagaimana untuk melaksanakan interaksi Redis responsif dalam Spring

PHPz
Lepaskan: 2023-05-27 17:49:47
ke hadapan
1238 orang telah melayarinya

Artikel ini akan mensimulasikan perkhidmatan pengguna dan menggunakan Redis sebagai pelayan storan data.
Melibatkan dua kacang java, pengguna dan hak

public class User {
    private long id;
    private String name;
    // 标签
    private String label;
    // 收货地址经度
    private Double deliveryAddressLon;
    // 收货地址维度
    private Double deliveryAddressLat;
    // 最新签到日
    private String lastSigninDay;
    // 积分
    private Integer score;
    // 权益
    private List<Rights> rights;
    ...
}

public class Rights {
    private Long id;
    private Long userId;
    private String name;
    ...
}
Salin selepas log masuk

Permulaan

Perkenalkan kebergantungan

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>
Salin selepas log masuk

Tambah konfigurasi Redis

spring.redis.host=192.168.56.102
spring.redis.port=6379
spring.redis.password=
spring.redis.timeout=5000
Salin selepas log masuk

permulaan SpringBoot

SpringBoot startup


Selepas aplikasi dimulakan, Spring akan menjana ReactiveRedisTemplate secara automatik (rangka kerja asasnya ialah Lettuce).
ReactiveRedisTemplate adalah serupa dengan RedisTemplate, tetapi ia menyediakan kaedah interaksi Redis yang tidak segerak dan responsif.
Saya ingin menekankan sekali lagi bahawa pengaturcaraan reaktif adalah tidak segerak ReactiveRedisTemplate tidak akan menyekat utas selepas menghantar permintaan Redis, dan utas semasa boleh melaksanakan tugas lain.
Selepas data respons Redis dikembalikan, ReactiveRedisTemplate menjadualkan urutan untuk memproses data respons.

Pengaturcaraan reaktif boleh melaksanakan panggilan tak segerak dan memproses hasil tak segerak dengan cara yang elegan, yang merupakan kepentingan terbesarnya.

Serialization

Serialization yang digunakan oleh ReactiveRedisTemplate secara lalai ialah Jdk serialization. kaedah nilai, memandangkan nilai senarai Redis dalam artikel ini hanya menyimpan rentetan, ia masih ditetapkan kepada StringRedisSerializer.UTF_8.

Jenis data asas

ReactiveRedisTemplate menyokong jenis data asas seperti rentetan Redis, cincangan, senarai, set dan set tersusun.

Artikel ini menggunakan cincang untuk menyimpan maklumat dan senarai pengguna untuk menyimpan hak pengguna Artikel ini tidak mengembangkan penggunaan jenis data asas yang lain.

@SpringBootApplication
public class UserServiceReactive {
    public static void main(String[] args) {
        new SpringApplicationBuilder(
                UserServiceReactive.class)
                .web(WebApplicationType.REACTIVE).run(args);
    }
}
Salin selepas log masuk

kaedah beanToMap bertanggungjawab untuk menukar kelas Pengguna kepada peta.

HyperLogLog

Struktur Redis HyperLogLog boleh mengira bilangan elemen berbeza dalam koleksi.

Gunakan HyperLogLog untuk mengira bilangan pengguna log masuk setiap hari

@Bean
public RedisSerializationContext redisSerializationContext() {
    RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
    builder.key(StringRedisSerializer.UTF_8);
    builder.value(RedisSerializer.json());
    builder.hashKey(StringRedisSerializer.UTF_8);
    builder.hashValue(StringRedisSerializer.UTF_8);

    return builder.build();
}

@Bean
public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
    RedisSerializationContext serializationContext = redisSerializationContext();
    ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);
    return reactiveRedisTemplate;
}
Salin selepas log masuk

BitMap

Redis BitMap (bitmap) menggunakan Bit untuk mewakili nilai atau status yang sepadan dengan elemen. Memandangkan Bit ialah unit storan komputer terkecil, menggunakannya untuk penyimpanan akan menjimatkan ruang.

Gunakan BitMap untuk merekod sama ada pengguna telah mendaftar masuk minggu ini

public Mono<Boolean>  save(User user) {
    ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();
    Mono<Boolean>  userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));
    if(user.getRights() != null) {
        ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();
        opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {
            logger.info("add rights:{}", l);
        });
    }
    return userRs;
}
Salin selepas log masuk

Id pengguna 48 bit tinggi digunakan untuk membahagikan pengguna kepada kekunci yang berbeza, dan 16 bit rendah digunakan sebagai parameter mengimbangi bitmap. .

Parameter ofset mestilah lebih besar daripada atau sama dengan 0 dan kurang daripada 2^32 (pemetaan bit terhad kepada 512 MB).

Geo

Redis Geo boleh menyimpan maklumat lokasi geografi dan mengira lokasi geografi.

Jika anda ingin mencari maklumat gudang dalam julat tertentu

public Mono<Long>  login(User user) {
    ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();
    return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());
}
Salin selepas log masuk

Dalam koleksi ini, anda perlu menyimpan maklumat lokasi gudang terlebih dahulu. warehouse:addressKaedah ReactiveGeoOperations#radius boleh mencari elemen dalam koleksi yang lokasi geografinya berada dalam julat tertentu Ia juga menyokong operasi seperti menambahkan elemen pada koleksi dan mengira jarak geografi antara dua elemen dalam koleksi.

Lua

ReactiveRedisTemplate juga boleh melaksanakan skrip Lua.

Logik daftar masuk pengguna dilengkapkan melalui skrip Lua di bawah: jika pengguna belum mendaftar masuk hari ini, daftar masuk dibenarkan dan mata dinaikkan sebanyak 1. Jika pengguna telah mendaftar masuk hari ini, operasi ditolak.

public void addSignInFlag(long userId) {
    String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);
    redisTemplate.opsForValue().setBit(
            key, userId & 0xffff , true)
    .subscribe(b -> logger.info("set:{},result:{}", key, b));
}
Salin selepas log masuk

Kandungan signin.lua adalah seperti berikut

public Flux getWarehouseInDist(User u, double dist) {
    ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();
    Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);
    RedisGeoCommands.GeoRadiusCommandArgs args =
            RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();
    return geo.radius("warehouse:address", circle, args);
}
Salin selepas log masuk

Strim

Redis Stream ialah jenis data yang baru ditambah dalam versi Redis 5.0. Jenis ini boleh melaksanakan baris gilir mesej, menyediakan ketekunan mesej dan fungsi replikasi siap sedia utama, dan boleh mengingati lokasi akses setiap pelanggan dan memastikan mesej tidak hilang.

Redis menggunakan reka bentuk kafka Berbilang kumpulan pengguna boleh wujud dalam Strim, dan berbilang pengguna boleh wujud dalam kumpulan pengguna.

Jika pengguna dalam kumpulan pengguna menggunakan mesej dalam Strim, mesej itu tidak akan digunakan oleh pengguna lain dalam kumpulan pengguna Sudah tentu, ia juga boleh digunakan oleh pengguna dalam kumpulan pengguna lain.

Yang berikut mentakrifkan pengguna Strim, bertanggungjawab untuk memproses data ekuiti yang diterima

public Flux<String> addScore(long userId) {
    DefaultRedisScript<String> script = new DefaultRedisScript<>();
    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));
    List<String> keys = new ArrayList<>();
    keys.add(String.valueOf(userId));
    keys.add(LocalDateTime.now().toString().substring(0, 10));
    return redisTemplate.execute(script, keys);
}
Salin selepas log masuk

Mari lihat cara menghantar maklumat

local score=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;)
local day=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;lastSigninDay&#39;)
if(day==KEYS[2])
    then
    return &#39;0&#39;
else
    redis.call(&#39;hset&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;, score+1,&#39;lastSigninDay&#39;,KEYS[2])
    return &#39;1&#39;
end
Salin selepas log masuk

Buat objek rekod mesej ObjectRecord dan lulus ReactiveStreamOperations Hantar rekod mesej.

Sentinel, Cluster

ReactiveRedisTemplate juga menyokong Redis Sentinel dan mod cluster Cluster, anda hanya perlu melaraskan konfigurasi.

Konfigurasi Sentinel adalah seperti berikut

@Component
public class RightsStreamConsumer implements ApplicationRunner, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container;
    // Stream队列
    private static final String STREAM_KEY = "stream:user:rights";
    // 消费组
    private static final String STREAM_GROUP = "user-service";
    // 消费者
    private static final String STREAM_CONSUMER = "consumer-1";

    @Autowired
    @Qualifier("reactiveRedisTemplate")
    private ReactiveRedisTemplate redisTemplate;

    public void run(ApplicationArguments args) throws Exception {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(100) //一批次拉取的最大count数
                        .executor(Executors.newSingleThreadExecutor())  //线程池
                        .pollTimeout(Duration.ZERO) //阻塞式轮询
                        .targetType(Rights.class) //目标类型(消息内容的类型)
                        .build();
        // 创建一个消息监听容器
        container = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream
        prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)
                .subscribe(stream -> {
            // 为Stream创建一个消费者,并绑定处理类
            container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                    StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
                    new StreamMessageListener());
            container.start();
        });
    }

    @Override
    public void destroy() throws Exception {
        container.stop();
    }

    // 查找Stream信息,如果不存在,则创建Stream
    private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) {
        // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。
        return ops.info(stream).onErrorResume(err -> {
            logger.warn("query stream err:{}", err.getMessage());
            // createGroup方法创建Stream
            return ops.createGroup(stream, group).flatMap(s -> ops.info(stream));
        });
    }

    // 消息处理对象
    class  StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> {
        public void onMessage(ObjectRecord<String, Rights> message) {
            // 处理消息
            RecordId id = message.getId();
            Rights rights = message.getValue();
            logger.info("receive id:{},rights:{}", id, rights);
            redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> {
                logger.info("add rights:{}", l);
            });
        }
    }
}
Salin selepas log masuk

Konfigurasi ialah alamat dan port IP nod Sentinel, bukan alamat dan port IP nod contoh Redis. spring.redis.sentinel.nodes

Konfigurasi kluster adalah seperti berikut

public Mono<RecordId> addRights(Rights r) {
    String streamKey = "stream:user:rights";//stream key
    ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);
    Mono<RecordId> mono = redisTemplate.opsForStream().add(record);
    return mono;
}
Salin selepas log masuk
Contohnya, node2 dalam Redis Cluster ialah nod hamba nod1, dan Lettuce akan menyimpan maklumat ini apabila node1 turun, Redis Cluster akan menaik taraf node2 kepada nod induk. Tetapi Lettuce tidak akan menukar permintaan secara automatik kepada node2 kerana penimbalnya tidak dibilas.

Dayakan konfigurasi
dan Lettuce boleh menyegarkan maklumat cache gugusan Redis Cluster secara kerap, menukar status nod pelanggan secara dinamik dan melengkapkan failover. spring.redis.lettuce.cluster.refresh.adaptive

Pada masa ini tiada penyelesaian untuk ReactiveRedisTemplate untuk melaksanakan saluran paip dan transaksi.

Atas ialah kandungan terperinci Bagaimana untuk melaksanakan interaksi Redis responsif dalam Spring. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:yisu.com
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan