Je me retrouve souvent dans des situations où j'ai besoin de mettre en cache ceci ou cela. Souvent, ces valeurs sont mises en cache pendant un certain temps. Vous connaissez probablement le modèle. Vous essayez d'obtenir une valeur du cache, si vous réussissez, vous la renvoyez à l'appelant et l'appelez un jour. Si la valeur n'est pas là, vous la récupérez (très probablement à partir de la base de données) ou la calculez et la mettez dans le cache. Dans la plupart des cas, cela fonctionne très bien. Cependant, si la clé que vous utilisez pour votre entrée de cache est fréquemment consultée et que l'opération de calcul des données prend un certain temps, vous vous retrouverez dans une situation où plusieurs requêtes parallèles obtiendront simultanément un échec de cache. Toutes ces requêtes chargeront indépendamment la source et stockeront la valeur dans le cache. Cela entraîne un gaspillage de ressources et peut même conduire à un déni de service.
Permettez-moi d'illustrer avec un exemple. J'utiliserai Redis pour le cache et un simple serveur Go http en plus. Voici le code complet :
package main import ( "errors" "log" "net/http" "time" "github.com/redis/go-redis/v9" ) type handler struct { rdb *redis.Client cacheTTL time.Duration } func (ch *handler) simple(w http.ResponseWriter, r *http.Request) { cacheKey := "my_cache_key" // we'll use 200 to signify a cache hit & 201 to signify a miss responseCode := http.StatusOK cachedData, err := ch.rdb.Get(r.Context(), cacheKey).Result() if err != nil { if !errors.Is(err, redis.Nil) { log.Println("could not reach redis", err.Error()) http.Error(w, "could not reach redis", http.StatusInternalServerError) return } // cache miss - fetch & store res := longRunningOperation() responseCode = http.StatusCreated err = ch.rdb.Set(r.Context(), cacheKey, res, ch.cacheTTL).Err() if err != nil { log.Println("failed to set cache value", err.Error()) http.Error(w, "failed to set cache value", http.StatusInternalServerError) return } cachedData = res } w.WriteHeader(responseCode) _, _ = w.Write([]byte(cachedData)) } func longRunningOperation() string { time.Sleep(time.Millisecond * 500) return "hello" } func main() { ttl := time.Second * 3 rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) handler := &handler{ rdb: rdb, cacheTTL: ttl, } http.HandleFunc("/simple", handler.simple) if err := http.ListenAndServe(":8080", nil); err != nil { log.Fatalf("Could not start server: %s\n", err.Error()) } }
Mettons un peu de charge sur le point de terminaison /simple et voyons ce qui se passe. J'utiliserai Vegeta pour ça.
Je lance une attaque Vegeta -duration=30s -rate=500 -targets=./targets_simple.txt > res_simple.bin. Vegeta finit par faire 500 requêtes chaque seconde pendant 30 secondes. Je les représente sous forme d'histogramme de codes de résultat HTTP avec des compartiments d'une durée de 100 ms chacun. Le résultat est le graphique suivant.
Lorsque nous démarrons l'expérience, le cache est vide - nous n'y avons aucune valeur stockée. Nous obtenons la bousculade initiale alors qu’un tas de demandes atteignent notre serveur. Tous vérifient le cache et n'y trouvent rien, appellent longRunningOperation et le stockent dans le cache. Étant donné que longRunningOperation prend environ 500 ms pour terminer, toutes les requêtes effectuées au cours des 500 premières ms finissent par appeler longRunningOperation. Une fois que l'une des requêtes parvient à stocker la valeur dans le cache, toutes les requêtes suivantes la récupèrent du cache et nous commençons à voir des réponses avec le code d'état de 200. Le modèle se répète ensuite toutes les 3 secondes lorsque le mécanisme d'expiration sur Redis entre en jeu.
Dans cet exemple de jouet, cela ne pose aucun problème, mais dans un environnement de production, cela peut entraîner une charge inutile sur vos systèmes, une expérience utilisateur dégradée ou même un déni de service auto-induit. Alors, comment pouvons-nous empêcher cela ? Eh bien, il existe plusieurs façons. Nous pourrions introduire un verrou - tout échec de cache entraînerait une tentative de code d'obtenir un verrou. Le verrouillage distribué n'est pas une chose triviale à faire et il présente souvent des cas extrêmes subtils qui nécessitent une manipulation délicate. Nous pourrions également recalculer périodiquement la valeur à l'aide d'un travail en arrière-plan, mais cela nécessite l'exécution d'un processus supplémentaire, introduisant encore un autre rouage qui doit être maintenu et surveillé dans notre code. Cette approche peut également ne pas être réalisable si vous disposez de clés de cache dynamiques. Il existe une autre approche, appelée expiration anticipée probabiliste, et c'est quelque chose que j'aimerais approfondir.
Cette technique permet de recalculer la valeur en fonction d'une probabilité. Lors de la récupération de la valeur du cache, vous calculez également si vous devez régénérer la valeur du cache en fonction d'une probabilité. Plus vous êtes proche de l'expiration de la valeur existante, plus la probabilité est élevée.
Je base l'implémentation spécifique sur XFetch par A. Vattani, F.Chierichetti et K. Lowenstein dans Optimal Probabilistic Cache Stampede Prevention.
Je vais introduire un nouveau point de terminaison sur le serveur HTTP qui effectuera également le calcul coûteux mais cette fois utilisera XFetch lors de la mise en cache. Pour que XFetch fonctionne, nous devons stocker la durée de l'opération coûteuse (le delta) et la date d'expiration de la clé de cache. Pour y parvenir, je vais introduire une structure qui contiendra ces valeurs ainsi que le message lui-même :
type probabilisticValue struct { Message string Expiry time.Time Delta time.Duration }
J'ajoute une fonction pour envelopper le message d'origine avec ces attributs et le sérialiser pour le stocker dans Redis :
func wrapMessage(message string, delta, cacheTTL time.Duration) (string, error) { bts, err := json.Marshal(probabilisticValue{ Message: message, Delta: delta, Expiry: time.Now().Add(cacheTTL), }) if err != nil { return "", fmt.Errorf("could not marshal message: %w", err) } return string(bts), nil }
Écrivons également une méthode pour recalculer et stocker la valeur dans redis :
func (ch *handler) recomputeValue(ctx context.Context, cacheKey string) (string, error) { start := time.Now() message := longRunningOperation() delta := time.Since(start) wrapped, err := wrapMessage(message, delta, ch.cacheTTL) if err != nil { return "", fmt.Errorf("could not wrap message: %w", err) } err = ch.rdb.Set(ctx, cacheKey, wrapped, ch.cacheTTL).Err() if err != nil { return "", fmt.Errorf("could not save value: %w", err) } return message, nil }
Pour déterminer si nous devons mettre à jour la valeur en fonction de la probabilité, nous pouvons ajouter une méthode à probabilisticValue :
func (pv probabilisticValue) shouldUpdate() bool { // suggested default param in XFetch implementation // if increased - results in earlier expirations beta := 1.0 now := time.Now() scaledGap := pv.Delta.Seconds() * beta * math.Log(rand.Float64()) return now.Sub(pv.Expiry).Seconds() >= scaledGap }
Si nous connectons tout cela, nous nous retrouvons avec le gestionnaire suivant :
func (ch *handler) probabilistic(w http.ResponseWriter, r *http.Request) { cacheKey := "probabilistic_cache_key" // we'll use 200 to signify a cache hit & 201 to signify a miss responseCode := http.StatusOK cachedData, err := ch.rdb.Get(r.Context(), cacheKey).Result() if err != nil { if !errors.Is(err, redis.Nil) { log.Println("could not reach redis", err.Error()) http.Error(w, "could not reach redis", http.StatusInternalServerError) return } res, err := ch.recomputeValue(r.Context(), cacheKey) if err != nil { log.Println("could not recompute value", err.Error()) http.Error(w, "could not recompute value", http.StatusInternalServerError) return } responseCode = http.StatusCreated cachedData = res w.WriteHeader(responseCode) _, _ = w.Write([]byte(cachedData)) return } pv := probabilisticValue{} err = json.Unmarshal([]byte(cachedData), &pv) if err != nil { log.Println("could not unmarshal probabilistic value", err.Error()) http.Error(w, "could not unmarshal probabilistic value", http.StatusInternalServerError) return } if pv.shouldUpdate() { _, err := ch.recomputeValue(r.Context(), cacheKey) if err != nil { log.Println("could not recompute value", err.Error()) http.Error(w, "could not recompute value", http.StatusInternalServerError) return } responseCode = http.StatusAccepted } w.WriteHeader(responseCode) _, _ = w.Write([]byte(cachedData)) }
Le gestionnaire fonctionne un peu comme le premier, cependant, dès que nous obtenons un accès au cache, nous lançons les dés. En fonction du résultat, soit nous renvoyons simplement la valeur que nous venons de récupérer, soit nous mettons à jour la valeur plus tôt.
Nous utiliserons les codes de statut HTTP pour déterminer entre les 3 cas :
Je redémarre Vegeta cette fois en exécutant le nouveau point final et voici le résultat :
여기에 있는 작은 파란색 얼룩은 실제로 캐시 값을 일찍 업데이트한 시기를 나타냅니다. 초기 워밍업 기간 이후에는 더 이상 캐시 누락이 표시되지 않습니다. 초기 급증을 방지하려면 사용 사례에 중요한 경우 캐시된 값을 미리 저장할 수 있습니다.
캐싱을 더욱 적극적으로 사용하고 값을 더 자주 새로 고치고 싶다면 베타 매개변수를 사용해 보세요. 베타 매개변수를 2로 설정한 동일한 실험은 다음과 같습니다.
이제 확률적 업데이트가 훨씬 더 자주 표시됩니다.
이 모든 것은 캐시 압류를 방지하는 데 도움이 될 수 있는 깔끔하고 작은 기술입니다. 하지만 이는 캐시에서 동일한 키를 주기적으로 가져오는 경우에만 작동한다는 점을 명심하세요. 그렇지 않으면 큰 이점을 볼 수 없습니다.
캐시 폭주를 처리하는 다른 방법이 있나요? 실수를 발견하셨나요? 아래 댓글로 알려주세요!
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!