この記事では、ユーザー サービスをシミュレートし、データ ストレージ サーバーとして Redis を使用します。
2 つの Java Beans (ユーザーと権利) が関係します
public class User { private long id; private String name; // 标签 private String label; // 收货地址经度 private Double deliveryAddressLon; // 收货地址维度 private Double deliveryAddressLat; // 最新签到日 private String lastSigninDay; // 积分 private Integer score; // 权益 private List<Rights> rights; ... } public class Rights { private Long id; private Long userId; private String name; ... }
依存関係の導入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
Redis 構成の追加
spring.redis.host=192.168.56.102 spring.redis.port=6379 spring.redis.password= spring.redis.timeout=5000
SpringBoot の起動
@SpringBootApplication public class UserServiceReactive { public static void main(String[] args) { new SpringApplicationBuilder( UserServiceReactive.class) .web(WebApplicationType.REACTIVE).run(args); } }
アプリケーションが開始されると、Spring は ReactiveRedisTemplate を自動的に生成します (その基礎となるフレームワークは Lettuce)。
ReactiveRedisTemplate は RedisTemplate に似ていますが、非同期で応答性の高い Redis 対話メソッドを提供します。
リアクティブ プログラミングは非同期であることをもう一度強調します。ReactiveRedisTemplate は Redis リクエストの送信後にスレッドをブロックせず、現在のスレッドは他のタスクを実行できます。
Redis 応答データが返された後、ReactiveRedisTemplate は応答データを処理するスレッドをスケジュールします。
レスポンシブ プログラミングでは、非同期呼び出しを実装し、非同期結果をエレガントな方法で処理できます。これがその最大の重要性です。
ReactiveRedisTemplate でデフォルトで使用されるシリアル化は Jdk シリアル化です。json シリアル化として構成できます
@Bean public RedisSerializationContext redisSerializationContext() { RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext(); builder.key(StringRedisSerializer.UTF_8); builder.value(RedisSerializer.json()); builder.hashKey(StringRedisSerializer.UTF_8); builder.hashValue(StringRedisSerializer.UTF_8); return builder.build(); } @Bean public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) { RedisSerializationContext serializationContext = redisSerializationContext(); ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext); return reactiveRedisTemplate; }
builder.hashValue メソッドは Redis リストのシリアル化を指定しますvalue メソッドでは、この記事の Redis リストの値は文字列のみを格納するため、依然として StringRedisSerializer.UTF_8 に設定されます。
ReactiveRedisTemplate は、Redis 文字列、ハッシュ、リスト、セット、順序付きセットなどの基本データ型をサポートします。
この記事では、ハッシュを使用してユーザー情報を保存し、リストを使用してユーザー権限を保存します。この記事では、他の基本的なデータ型の使用については詳しく説明しません。
public Mono<Boolean> save(User user) { ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash(); Mono<Boolean> userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user)); if(user.getRights() != null) { ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList(); opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> { logger.info("add rights:{}", l); }); } return userRs; }
beanToMap メソッドは、User クラスをマップに変換する役割を果たします。
Redis HyperLogLog 構造は、コレクション内のさまざまな要素の数をカウントできます。
HyperLogLog を使用して、毎日ログインするユーザーの数をカウントします
public Mono<Long> login(User user) { ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog(); return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId()); }
Redis BitMap (ビットマップ) は、ビットを使用して要素に対応する値またはステータスを表します。ビットはコンピュータのストレージの最小単位であるため、ビットをストレージとして使用するとスペースを節約できます。
ビットマップを使用して、ユーザーが今週チェックインしたかどうかを記録します
public void addSignInFlag(long userId) { String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16); redisTemplate.opsForValue().setBit( key, userId & 0xffff , true) .subscribe(b -> logger.info("set:{},result:{}", key, b)); }
userId の上位 48 ビットはユーザーを異なるキーに分割するために使用され、下位 16 ビットはビットマップ オフセット パラメーターのオフセットとして使用されます。 。
オフセット パラメーターは 0 以上 2^32 未満である必要があります (ビット マッピングは 512 MB に制限されています)。
Redis Geo は、地理的位置情報を保存し、地理的位置を計算できます。
指定された範囲内の倉庫情報を探している場合
public Flux getWarehouseInDist(User u, double dist) { ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo(); Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist); RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending(); return geo.radius("warehouse:address", circle, args); }
warehouse:address
最初に倉庫の地理的位置情報をこのコレクションに保存する必要があります。
ReactiveGeoOperations#radius メソッドは、地理的位置が指定された範囲内にあるコレクション内の要素を検索できます。また、コレクションへの要素の追加や、コレクション内の 2 つの要素間の地理的距離の計算などの操作もサポートします。
ReactiveRedisTemplate は Lua スクリプトを実行することもできます。
ユーザーのチェックイン ロジックは、以下の Lua スクリプトによって完了します。ユーザーが今日チェックインしていない場合、チェックインが許可され、ポイントが 1 増加します。ユーザーが今日サインインしている場合、操作は拒否されます。
public Flux<String> addScore(long userId) { DefaultRedisScript<String> script = new DefaultRedisScript<>(); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua"))); List<String> keys = new ArrayList<>(); keys.add(String.valueOf(userId)); keys.add(LocalDateTime.now().toString().substring(0, 10)); return redisTemplate.execute(script, keys); }
signin.lua の内容は次のとおりです
local score=redis.call('hget','user:'..KEYS[1],'score') local day=redis.call('hget','user:'..KEYS[1],'lastSigninDay') if(day==KEYS[2]) then return '0' else redis.call('hset','user:'..KEYS[1],'score', score+1,'lastSigninDay',KEYS[2]) return '1' end
Redis Stream は、Redis 5.0 バージョンで新しく追加されたデータ型です。このタイプは、メッセージ キューを実装し、メッセージの永続化とプライマリとスタンバイのレプリケーション機能を提供し、各クライアントのアクセス場所を記憶してメッセージが失われないようにすることができます。
Redis は kafka の設計を利用しており、ストリーム内に複数のコンシューマ グループが存在でき、1 つのコンシューマ グループ内に複数のコンシューマが存在できます。
コンシューマ グループ内のコンシューマがストリーム内のメッセージを消費する場合、そのメッセージはコンシューマ グループ内の他のコンシューマによって消費されません。もちろん、他のコンシューマ グループ内のコンシューマによって消費されることもあります。
受信した株式データの処理を担当するストリーム コンシューマを以下に定義します。
@Component public class RightsStreamConsumer implements ApplicationRunner, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class); @Autowired private RedisConnectionFactory redisConnectionFactory; private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container; // Stream队列 private static final String STREAM_KEY = "stream:user:rights"; // 消费组 private static final String STREAM_GROUP = "user-service"; // 消费者 private static final String STREAM_CONSUMER = "consumer-1"; @Autowired @Qualifier("reactiveRedisTemplate") private ReactiveRedisTemplate redisTemplate; public void run(ApplicationArguments args) throws Exception { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(100) //一批次拉取的最大count数 .executor(Executors.newSingleThreadExecutor()) //线程池 .pollTimeout(Duration.ZERO) //阻塞式轮询 .targetType(Rights.class) //目标类型(消息内容的类型) .build(); // 创建一个消息监听容器 container = StreamMessageListenerContainer.create(redisConnectionFactory, options); // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP) .subscribe(stream -> { // 为Stream创建一个消费者,并绑定处理类 container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER), StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()), new StreamMessageListener()); container.start(); }); } @Override public void destroy() throws Exception { container.stop(); } // 查找Stream信息,如果不存在,则创建Stream private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) { // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。 return ops.info(stream).onErrorResume(err -> { logger.warn("query stream err:{}", err.getMessage()); // createGroup方法创建Stream return ops.createGroup(stream, group).flatMap(s -> ops.info(stream)); }); } // 消息处理对象 class StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> { public void onMessage(ObjectRecord<String, Rights> message) { // 处理消息 RecordId id = message.getId(); Rights rights = message.getValue(); logger.info("receive id:{},rights:{}", id, rights); redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> { logger.info("add rights:{}", l); }); } } }
情報を送信する方法を見てみましょう
public Mono<RecordId> addRights(Rights r) { String streamKey = "stream:user:rights";//stream key ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r); Mono<RecordId> mono = redisTemplate.opsForStream().add(record); return mono; }
メッセージ レコード オブジェクト ObjectRecord を作成し、 pass ReactiveStreamOperations メッセージ レコードを送信します。
ReactiveRedisTemplate は、Redis Sentinel および Cluster クラスター モードもサポートしているため、構成を調整するだけで済みます。
Sentinel の設定は次のとおりです
spring.redis.sentinel.master=mymaster spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379 spring.redis.sentinel.password=
spring.redis.sentinel.nodes
設定は Sentinel ノードの IP アドレスとポートであり、Redis インスタンス ノードの IP アドレスとポートではありません。
クラスター構成は次のとおりです
spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379 spring.redis.lettuce.cluster.refresh.period=10000 spring.redis.lettuce.cluster.refresh.adaptive=true
たとえば、Redis クラスターのノード 2 はノード 1 のスレーブ ノードであり、レタスはこの情報をキャッシュします。ノード 1 がダウンすると、Redis クラスターはノード 2 を次のようにアップグレードします。マスターノード。ただし、レタスのバッファはフラッシュされないため、レタスはリクエストを自動的にノード 2 に切り替えません。
Openspring.redis.lettuce.cluster.refresh.adaptive
構成では、Lettuce は Redis Cluster クラスター キャッシュ情報を定期的に更新し、クライアントのノード ステータスを動的に変更し、フェールオーバーを完了できます。
現時点では、ReactiveRedisTemplate にパイプラインとトランザクションを実装するためのソリューションはありません。
以上がSpring でレスポンシブな Redis インタラクションを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。