這篇文章為大家帶來了關於Redis的相關知識,其中主要介紹的是分散式快取和本地快取的使用技巧,包括快取種類介紹,各種的使用場景,以及如何使用,最後再給實戰案例,下面一起來看一下,希望對大家有幫助。
推薦學習:Redis影片教學
#眾所周知,快取最主要的目的是加速訪問,緩解資料庫壓力。最常用的緩存就是分散式緩存,例如redis,在面對大部分並發場景或一些中小型公司流量沒有那麼高的情況,使用redis基本上都能解決了。但在流量較高的情況下可能得使用到本機快取了,例如guava的LoadingCache和快手開源的ReloadableCache。
這部分會介紹redis,像是guava的LoadingCache和快手開源的ReloadableCache的使用場景和限制,透過這部分的介紹就能知道在怎樣的業務場景下應該使用哪種緩存,以及為什麼。
如果寬泛的說redis何時使用,那麼自然就是用戶訪問量過高的地方使用,從而加速訪問,並且緩解資料庫壓力。如果細分的話,還得分為單節點問題和非單節點問題。
如果一個頁面使用者訪問量比較高,但是訪問的不是同一個資源。例如使用者詳情頁,訪問量比較高,但每個使用者的資料都是不一樣的,這種情況顯然只能用分散式快取了,如果使用redis,key為使用者唯一鍵,value則是使用者資訊。
redis導致的快取擊穿。
但是要注意一點,一定要設定過期時間,而且不能設定到同一時間點過期。舉個例子,例如用戶又個活動頁,活動頁能看到用戶活動期間獲獎數據,粗心的人可能會設定用戶數據的過期時間點為活動結束,這樣會
#單(熱)點問題
單節點問題說的是redis的單一節點的並發問題,因為對於相同的key會落到redis集群的同一個節點上,那麼如果對這個key的訪問量過高,那麼這個redis節點就存在並發隱患,這個key就稱為熱key。
如果所有使用者存取的都是同一個資源,例如小愛同學app首頁對所有使用者展示的內容都一樣(初期),服務端給h5返回的是同一個大json,顯然得使用到緩存。首先我們考慮下用redis是否可行,由於redis存在單點問題,如果流量過大的話,那麼所有用戶的請求到達redis的同一個節點,需要評估該節點能否抗住這麼大流量。我們的規則是,如果單節點qps達到了千級就要解決單點問題了(即使redis號稱能抗住十萬級的qps),最常見的做法就是使用本地快取。顯然小愛app首頁流量不過百,使用redis是沒問題的。
對於這上面說的熱key問題,我們最直接的做法就是使用本地緩存,例如你最熟悉的guava的LoadingCache,但是使用本地快取要求能夠接受一定的髒數據,因為如果你更新了首頁,本地快取是不會更新的,它只會根據一定的過期策略來重新加載緩存,不過在我們這個場景是完全沒問題的,因為一旦在後台推送了首頁後就不會再去改變了。即使改變了也沒問題,可以設定寫過期為半小時,超過半小時重新加載緩存,這種短時間內的髒數據我們是可以接受的。
LoadingCache導致的快取擊穿
雖然說本地快取和機器上強相關的,雖然程式碼層面寫的是半小時過期,但由於每台機器的啟動時間不同,導致快取的載入時間不同,過期時間也就不同,也就不會所有機器上的請求在同一時間快取失效後都去請求資料庫。但對於單一一台機器也是會導致快取穿透的,假如有10台機器,每台1000的qps,只要有一台快取過期就可能導致這1000個請求同時打到了資料庫。這個問題其實比較好解決,但是容易被忽略,也就是在設定LoadingCache的時候使用LoadingCache的load-miss方法,而不是直接判斷cache.getIfPresent()== null然後去請求db;前者會加虛擬機層面的鎖,保證只有一個請求打到資料庫去,從而完美的解決了這個問題。
但是,如果對於即時性要求較高的情況,例如有段時間要經常做活動,我要保證活動頁面能近實時更新,也就是運營在後台配置好了活動信息後,需要在C端近即時展示這次配置的活動訊息,此時使用LoadingCache肯定就不能滿足了。
對於上面說的LoadingCache不能解決的即時問題,可以考慮使用ReloadableCache,這是快手開源的一個本機快取框架,最大的特點是支援多機器同時更新緩存,假設我們修改了首頁信息,然後請求打到的是A機器,這個時候重新加載ReloadableCache,然後它會發出通知,監聽了同一zk節點的其他機器收到通知後重新更新緩存。使用這個緩存一般的要求是將全量資料載入到本地緩存,所以如果資料量過大肯定會對gc造成壓力,這種情況就不能使用了。由於小愛同學首頁這個首頁是帶有狀態的,一般online狀態的就那麼兩個,所以完全可以使用ReloadableCache來只裝載online狀態的首頁。
到這裡三種快取基本上都介紹完了,做個小結:
;要嘛使用build的時候使用的是
build(CacheLoader super K1, V1> loader)這個時候可以直接使用get( )了。另外建議使用load-miss,而不是getIfPresent==null的時候再去查資料庫,這可能會導致快取擊穿;
LoadingCache<String, String> cache = CacheBuilder.newBuilder() .maximumSize(1000L) .expireAfterAccess(Duration.ofHours(1L)) // 多久不访问就过期 .expireAfterWrite(Duration.ofHours(1L)) // 多久这个key没修改就过期 .build(new CacheLoader<String, String>() { @Override public String load(String key) throws Exception { // 数据装载方式,一般就是loadDB return key + " world"; } }); String value = cache.get("hello"); // 返回hello world
<dependency> <groupId>com.github.phantomthief</groupId> <artifactId>zknotify-cache</artifactId> <version>0.1.22</version> </dependency>
public interface ReloadableCache<T> extends Supplier<T> { /** * 获取缓存数据 */ @Override T get(); /** * 通知全局缓存更新 * 注意:如果本地缓存没有初始化,本方法并不会初始化本地缓存并重新加载 * * 如果需要初始化本地缓存,请先调用 {@link ReloadableCache#get()} */ void reload(); /** * 更新本地缓存的本地副本 * 注意:如果本地缓存没有初始化,本方法并不会初始化并刷新本地的缓存 * * 如果需要初始化本地缓存,请先调用 {@link ReloadableCache#get()} */ void reloadLocal(); }
import com.google.common.collect.Lists; import org.apache.commons.collections4.CollectionUtils; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; /** * @Description 分布式加载缓存的rpc服务,如果部署了多台机器那么调用端最好使用id做一致性hash保证相同id的请求打到同一台机器。 **/ public abstract class AbstractCacheSetterService implements CacheSetterService { private final ConcurrentMap<String, CountDownLatch> loadCache = new ConcurrentHashMap<>(); private final Object lock = new Object(); @Override public void load(Collection<String> needLoadIds) { if (CollectionUtils.isEmpty(needLoadIds)) { return; } CountDownLatch latch; Collection<CountDownLatch> loadingLatchList; synchronized (lock) { loadingLatchList = excludeLoadingIds(needLoadIds); needLoadIds = Collections.unmodifiableCollection(needLoadIds); latch = saveLatch(needLoadIds); } System.out.println("needLoadIds:" + needLoadIds); try { if (CollectionUtils.isNotEmpty(needLoadIds)) { loadCache(needLoadIds); } } finally { release(needLoadIds, latch); block(loadingLatchList); } } /** * 加锁 * @param loadingLatchList 需要加锁的id对应的CountDownLatch */ protected void block(Collection<CountDownLatch> loadingLatchList) { if (CollectionUtils.isEmpty(loadingLatchList)) { return; } System.out.println("block:" + loadingLatchList); loadingLatchList.forEach(l -> { try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } }); } /** * 释放锁 * @param needLoadIds 需要释放锁的id集合 * @param latch 通过该CountDownLatch来释放锁 */ private void release(Collection<String> needLoadIds, CountDownLatch latch) { if (CollectionUtils.isEmpty(needLoadIds)) { return; } synchronized (lock) { needLoadIds.forEach(id -> loadCache.remove(id)); } if (latch != null) { latch.countDown(); } } /** * 加载缓存,比如根据id从db查询数据,然后设置到redis中 * @param needLoadIds 加载缓存的id集合 */ protected abstract void loadCache(Collection<String> needLoadIds); /** * 对需要加载缓存的id绑定CountDownLatch,后续相同的id请求来了从map中找到CountDownLatch,并且await,直到该线程加载完了缓存 * @param needLoadIds 能够正在去加载缓存的id集合 * @return 公用的CountDownLatch */ protected CountDownLatch saveLatch(Collection<String> needLoadIds) { if (CollectionUtils.isEmpty(needLoadIds)) { return null; } CountDownLatch latch = new CountDownLatch(1); needLoadIds.forEach(loadId -> loadCache.put(loadId, latch)); System.out.println("loadCache:" + loadCache); return latch; } /** * 哪些id正在加载数据,此时持有相同id的线程需要等待 * @param ids 需要加载缓存的id集合 * @return 正在加载的id所对应的CountDownLatch集合 */ private Collection<CountDownLatch> excludeLoadingIds(Collection<String> ids) { List<CountDownLatch> loadingLatchList = Lists.newArrayList(); Iterator<String> iterator = ids.iterator(); while (iterator.hasNext()) { String id = iterator.next(); CountDownLatch latch = loadCache.get(id); if (latch != null) { loadingLatchList.add(latch); iterator.remove(); } } System.out.println("loadingLatchList:" + loadingLatchList); return loadingLatchList; } }
import java.util.Collection; public class BizCacheSetterRpcService extends AbstractCacheSetterService { @Override protected void loadCache(Collection<String> needLoadIds) { // 读取db进行处理 // 设置缓存 } }
就是大量快取集中失效打到了db,當然肯定都是一類的業務緩存,歸根到底是程式碼寫的有問題。可以將快取失效的過期時間打散,別讓其集中失效就可以了。
推薦學習:Redis影片教學
以上是高並發技巧之Redis和本地快取使用技巧分享的詳細內容。更多資訊請關注PHP中文網其他相關文章!