ホームページ > データベース > Redis > Spring でレスポンシブな Redis インタラクションを実装する方法

Spring でレスポンシブな Redis インタラクションを実装する方法

PHPz
リリース: 2023-05-27 17:49:47
転載
1255 人が閲覧しました

この記事では、ユーザー サービスをシミュレートし、データ ストレージ サーバーとして Redis を使用します。
2 つの Java Beans (ユーザーと権利) が関係します

public class User {
    private long id;
    private String name;
    // 标签
    private String label;
    // 收货地址经度
    private Double deliveryAddressLon;
    // 收货地址维度
    private Double deliveryAddressLat;
    // 最新签到日
    private String lastSigninDay;
    // 积分
    private Integer score;
    // 权益
    private List<Rights> rights;
    ...
}

public class Rights {
    private Long id;
    private Long userId;
    private String name;
    ...
}
ログイン後にコピー

起動

依存関係の導入

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>
ログイン後にコピー

Redis 構成の追加

spring.redis.host=192.168.56.102
spring.redis.port=6379
spring.redis.password=
spring.redis.timeout=5000
ログイン後にコピー

SpringBoot の起動

@SpringBootApplication
public class UserServiceReactive {
    public static void main(String[] args) {
        new SpringApplicationBuilder(
                UserServiceReactive.class)
                .web(WebApplicationType.REACTIVE).run(args);
    }
}
ログイン後にコピー

アプリケーションが開始されると、Spring は ReactiveRedisTemplate を自動的に生成します (その基礎となるフレームワークは Lettuce)。
ReactiveRedisTemplate は RedisTemplate に似ていますが、非同期で応答性の高い Redis 対話メソッドを提供します。
リアクティブ プログラミングは非同期であることをもう一度強調します。ReactiveRedisTemplate は Redis リクエストの送信後にスレッドをブロックせず、現在のスレッドは他のタスクを実行できます。
Redis 応答データが返された後、ReactiveRedisTemplate は応答データを処理するスレッドをスケジュールします。
レスポンシブ プログラミングでは、非同期呼び出しを実装し、非同期結果をエレガントな方法で処理できます。これがその最大の重要性です。

シリアル化

ReactiveRedisTemplate でデフォルトで使用されるシリアル化は Jdk シリアル化です。json シリアル化として構成できます

@Bean
public RedisSerializationContext redisSerializationContext() {
    RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
    builder.key(StringRedisSerializer.UTF_8);
    builder.value(RedisSerializer.json());
    builder.hashKey(StringRedisSerializer.UTF_8);
    builder.hashValue(StringRedisSerializer.UTF_8);

    return builder.build();
}

@Bean
public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
    RedisSerializationContext serializationContext = redisSerializationContext();
    ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);
    return reactiveRedisTemplate;
}
ログイン後にコピー

builder.hashValue メソッドは Redis リストのシリアル化を指定しますvalue メソッドでは、この記事の Redis リストの値は文字列のみを格納するため、依然として StringRedisSerializer.UTF_8 に設定されます。

基本データ型

ReactiveRedisTemplate は、Redis 文字列、ハッシュ、リスト、セット、順序付きセットなどの基本データ型をサポートします。
この記事では、ハッシュを使用してユーザー情報を保存し、リストを使用してユーザー権限を保存します。この記事では、他の基本的なデータ型の使用については詳しく説明しません。

public Mono<Boolean>  save(User user) {
    ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();
    Mono<Boolean>  userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));
    if(user.getRights() != null) {
        ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();
        opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {
            logger.info("add rights:{}", l);
        });
    }
    return userRs;
}
ログイン後にコピー

beanToMap メソッドは、User クラスをマップに変換する役割を果たします。

HyperLogLog

Redis HyperLogLog 構造は、コレクション内のさまざまな要素の数をカウントできます。
HyperLogLog を使用して、毎日ログインするユーザーの数をカウントします

public Mono<Long>  login(User user) {
    ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();
    return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());
}
ログイン後にコピー

BitMap

Redis BitMap (ビットマップ) は、ビットを使用して要素に対応する値またはステータスを表します。ビットはコンピュータのストレージの最小単位であるため、ビットをストレージとして使用するとスペースを節約できます。
ビットマップを使用して、ユーザーが今週チェックインしたかどうかを記録します

public void addSignInFlag(long userId) {
    String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);
    redisTemplate.opsForValue().setBit(
            key, userId & 0xffff , true)
    .subscribe(b -> logger.info("set:{},result:{}", key, b));
}
ログイン後にコピー

userId の上位 48 ビットはユーザーを異なるキーに分割するために使用され、下位 16 ビットはビットマップ オフセット パラメーターのオフセットとして使用されます。 。
オフセット パラメーターは 0 以上 2^32 未満である必要があります (ビット マッピングは 512 MB に制限されています)。

Geo

Redis Geo は、地理的位置情報を保存し、地理的位置を計算できます。
指定された範囲内の倉庫情報を探している場合

public Flux getWarehouseInDist(User u, double dist) {
    ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();
    Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);
    RedisGeoCommands.GeoRadiusCommandArgs args =
            RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();
    return geo.radius("warehouse:address", circle, args);
}
ログイン後にコピー

warehouse:address最初に倉庫の地理的位置情報をこのコレクションに保存する必要があります。
ReactiveGeoOperations#radius メソッドは、地理的位置が指定された範囲内にあるコレクション内の要素を検索できます。また、コレクションへの要素の追加や、コレクション内の 2 つの要素間の地理的距離の計算などの操作もサポートします。

Lua

ReactiveRedisTemplate は Lua スクリプトを実行することもできます。
ユーザーのチェックイン ロジックは、以下の Lua スクリプトによって完了します。ユーザーが今日チェックインしていない場合、チェックインが許可され、ポイントが 1 増加します。ユーザーが今日サインインしている場合、操作は拒否されます。

public Flux<String> addScore(long userId) {
    DefaultRedisScript<String> script = new DefaultRedisScript<>();
    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));
    List<String> keys = new ArrayList<>();
    keys.add(String.valueOf(userId));
    keys.add(LocalDateTime.now().toString().substring(0, 10));
    return redisTemplate.execute(script, keys);
}
ログイン後にコピー

signin.lua の内容は次のとおりです

local score=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;)
local day=redis.call(&#39;hget&#39;,&#39;user:&#39;..KEYS[1],&#39;lastSigninDay&#39;)
if(day==KEYS[2])
    then
    return &#39;0&#39;
else
    redis.call(&#39;hset&#39;,&#39;user:&#39;..KEYS[1],&#39;score&#39;, score+1,&#39;lastSigninDay&#39;,KEYS[2])
    return &#39;1&#39;
end
ログイン後にコピー

Stream

Redis Stream は、Redis 5.0 バージョンで新しく追加されたデータ型です。このタイプは、メッセージ キューを実装し、メッセージの永続化とプライマリとスタンバイのレプリケーション機能を提供し、各クライアントのアクセス場所を記憶してメッセージが失われないようにすることができます。

Redis は kafka の設計を利用しており、ストリーム内に複数のコンシューマ グループが存在でき、1 つのコンシューマ グループ内に複数のコンシューマが存在できます。
コンシューマ グループ内のコンシューマがストリーム内のメッセージを消費する場合、そのメッセージはコンシューマ グループ内の他のコンシューマによって消費されません。もちろん、他のコンシューマ グループ内のコンシューマによって消費されることもあります。

受信した株式データの処理を担当するストリーム コンシューマを以下に定義します。

@Component
public class RightsStreamConsumer implements ApplicationRunner, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container;
    // Stream队列
    private static final String STREAM_KEY = "stream:user:rights";
    // 消费组
    private static final String STREAM_GROUP = "user-service";
    // 消费者
    private static final String STREAM_CONSUMER = "consumer-1";

    @Autowired
    @Qualifier("reactiveRedisTemplate")
    private ReactiveRedisTemplate redisTemplate;

    public void run(ApplicationArguments args) throws Exception {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(100) //一批次拉取的最大count数
                        .executor(Executors.newSingleThreadExecutor())  //线程池
                        .pollTimeout(Duration.ZERO) //阻塞式轮询
                        .targetType(Rights.class) //目标类型(消息内容的类型)
                        .build();
        // 创建一个消息监听容器
        container = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream
        prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)
                .subscribe(stream -> {
            // 为Stream创建一个消费者,并绑定处理类
            container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                    StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
                    new StreamMessageListener());
            container.start();
        });
    }

    @Override
    public void destroy() throws Exception {
        container.stop();
    }

    // 查找Stream信息,如果不存在,则创建Stream
    private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) {
        // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。
        return ops.info(stream).onErrorResume(err -> {
            logger.warn("query stream err:{}", err.getMessage());
            // createGroup方法创建Stream
            return ops.createGroup(stream, group).flatMap(s -> ops.info(stream));
        });
    }

    // 消息处理对象
    class  StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> {
        public void onMessage(ObjectRecord<String, Rights> message) {
            // 处理消息
            RecordId id = message.getId();
            Rights rights = message.getValue();
            logger.info("receive id:{},rights:{}", id, rights);
            redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> {
                logger.info("add rights:{}", l);
            });
        }
    }
}
ログイン後にコピー

情報を送信する方法を見てみましょう

public Mono<RecordId> addRights(Rights r) {
    String streamKey = "stream:user:rights";//stream key
    ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);
    Mono<RecordId> mono = redisTemplate.opsForStream().add(record);
    return mono;
}
ログイン後にコピー

メッセージ レコード オブジェクト ObjectRecord を作成し、 pass ReactiveStreamOperations メッセージ レコードを送信します。

Sentinel、Cluster

ReactiveRedisTemplate は、Redis Sentinel および Cluster クラスター モードもサポートしているため、構成を調整するだけで済みます。
Sentinel の設定は次のとおりです

spring.redis.sentinel.master=mymaster
spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379
spring.redis.sentinel.password=
ログイン後にコピー

spring.redis.sentinel.nodes設定は Sentinel ノードの IP アドレスとポートであり、Redis インスタンス ノードの IP アドレスとポートではありません。

クラスター構成は次のとおりです

spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379
spring.redis.lettuce.cluster.refresh.period=10000
spring.redis.lettuce.cluster.refresh.adaptive=true
ログイン後にコピー

たとえば、Redis クラスターのノード 2 はノード 1 のスレーブ ノードであり、レタスはこの情報をキャッシュします。ノード 1 がダウンすると、Redis クラスターはノード 2 を次のようにアップグレードします。マスターノード。ただし、レタスのバッファはフラッシュされないため、レタスはリクエストを自動的にノード 2 に切り替えません。
Openspring.redis.lettuce.cluster.refresh.adaptive構成では、Lettuce は Redis Cluster クラスター キャッシュ情報を定期的に更新し、クライアントのノード ステータスを動的に変更し、フェールオーバーを完了できます。

現時点では、ReactiveRedisTemplate にパイプラインとトランザクションを実装するためのソリューションはありません。

以上がSpring でレスポンシブな Redis インタラクションを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:yisu.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート