寫一段簡單正常的超賣邏輯程式碼,多個使用者同時操作同一段數據,探究出現的問題。
Redis中儲存一項資料訊息,請求對應接口,取得商品數量資訊;
商品數量資訊如果大於0,則扣減1,重新儲存Redis中;
執行程式碼測試問題。
/** * Redis数据库操作,超卖问题模拟 * @author * */ @RestController public class RedisController { // 引入String类型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; // 测试数据设置接口 @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } // 模拟商品超卖代码 @RequestMapping("/deductStock") public String deductStock() { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:"+realStock); }else { System.out.println("库存不足....."); } return "end"; } }
在單一應用模式下,使用jmeter
壓測。
測試結果:
每個請求都相當於一個線程,當幾個線程同時拿到資料時,線程A拿到庫存為84,這個時候線程B也進入程序,並且搶佔了CPU,訪問庫存為84,最後兩個線程都對庫存減一,導致最後修改為83,實際上多賣了一件
既然線程和線程之間,數據處理不一致,能否使用synchronized
加鎖測試?
依舊還是先測試單一伺服器
// 模拟商品超卖代码, // 设置synchronized同步锁 @RequestMapping("/deductStock1") public String deductStock1() { synchronized (this) { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:"+realStock); }else { System.out.println("库存不足....."); } } return "end"; }
數量100
##重新壓測,得到的日誌資訊如下所示:
在單機模式下,加入synchronized關鍵字,的確能夠避免商品的超賣現象!
但是在分散式微服務中,針對該服務設定了集群,synchronized依舊還能保證資料的正確性嗎?
假設多個請求,被註冊中心負載平衡,每個微服務中的該處理接口,都添加有synchronized,
超賣問題:
的synchronized
單一伺服器
只是針對
JVM
進行加鎖,但是分散式是很多個不同的伺服器,導致兩個執行緒或多個在不同伺服器上共同對商品數量資訊做了操作!
setnx (set if not exists)Redis實作分散式鎖定
在Redis中存在一條指令
#setnx key value
如果不存在key,則可以設定成功;否則設定失敗。
修改處理接口,增加key
<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">// 模拟商品超卖代码
@RequestMapping("/deductStock2")
public String deductStock2() {
// 创建一个key,保存至redis
String key = "lock";
// setnx
// 由于redis是一个单线程,执行命令采取“队列”形式排队!
// 优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败。
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock");
// 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false
if (!result) {
// 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!
return "err";
}
// 获取Redis数据库中的商品数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
// 减库存
if(stock > 0) {
int realStock = stock -1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
System.out.println("商品扣减成功,剩余商品:"+realStock);
}else {
System.out.println("库存不足.....");
}
// 程序执行完成,则删除这个key
stringRedisTemplate.delete(key);
return "end";
}</pre><div class="contentsignin">登入後複製</div></div>
2、當創建成功後,執行搶購邏輯。
3、搶購邏輯執行完成後,刪除資料庫中對應的setnx
的key。讓其他請求能夠設定並操作。
這種邏輯來說比之前單一使用
syn
合理的多,但是如果執行搶購操作中出現了異常,導致這個key
無法被
,程式邏輯死鎖! 可以採取try … finally進行操作
/** * 模拟商品超卖代码 设置 * * @return */ @RequestMapping("/deductStock3") public String deductStock3() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 stringRedisTemplate.delete(key); } return "end"; }
這個邏輯相比上面其他的邏輯來說,顯得更加的嚴謹。
但是,如果一套伺服器,因為斷電、系統崩潰等原因出現,導致宕機
key一直存在,導致本該執行
finally
中的語句未成功執行完成! !同樣出現
死鎖!
透過超時間解決上述問題
在設定成功
setnx
後,以及搶購程式碼邏輯執行前,增加key的限時。但是上述程式碼的邏輯中依舊會有問題:#########如果處理邏輯中,出現###逾時###問題。 ###當邏輯執行時,時間超過設定key有效時間,此時會出現什麼問題? ##################### 從上圖可以清楚的發現問題:###如果一個請求執行時間超過了key的有效時間。 ###新的請求執行過來時,必然可以拿到key並設定時間;###此時的redis中保存的key並不是請求1的key,而是別的請求設定的。 ###當請求1執行完成後,此處刪除key,刪除的是別的請求設定的key! ###/** * 模拟商品超卖代码 设置setnx保证分布式环境下,数据处理安全行问题;<br> * 但如果某个代码段执行异常,导致key无法清理,出现死锁,添加try...finally;<br> * 如果某个服务因某些问题导致释放key不能执行,导致死锁,此时解决思路为:增加key的有效时间;<br> * 为了保证设置key的值和设置key的有效时间,两条命令构成同一条原子命令,将下列逻辑换成其他代码。 * * @return */ @RequestMapping("/deductStock4") public String deductStock4() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败 //boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock", 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } // 设置key有效时间 //stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS); try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 stringRedisTemplate.delete(key); } return "end"; }登入後複製
依然出现了key形同虚设
的问题!如果失效一直存在,超卖问题依旧不会解决。
既然出现key形同虚设的现象,是否可以增加条件,当finally中需要执行删除操作时,获取数据判断值是否是该请求中对应的,如果是则删除,不是则不管!
修改上述代码如下所示:
/** * 模拟商品超卖代码 <br> * 解决`deductStock6`中,key形同虚设的问题。 * * @return */ @RequestMapping("/deductStock5") public String deductStock5() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); // setnx //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, lock_value, 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 // 判断redis中该数据是否是这个接口处理时的设置的,如果是则删除 if(lock_value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(key))) { stringRedisTemplate.delete(key); } } return "end"; }
由于获得锁的线程必须执行完减库存逻辑才能释放锁,所以在此期间所有其他的线程都会由于没获得锁,而直接结束程序,导致有很多库存根本没有卖出去,所以这里应该可以优化,让没获得锁的线程等待,或者循环检查锁
我们将锁封装到一个实体类中,然后加入两个方法,加锁和解锁
@Component public class RedisLock { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final long acquireTimeout = 10*1000; // 获取锁之前的超时时间(获取锁的等待重试时间) private final int timeOut = 20; // 获取锁之后的超时时间(防止死锁) @Autowired private StringRedisTemplate stringRedisTemplate; // 引入String类型redis操作模板 /** * 获取分布式锁 * @return 锁标识 */ public boolean getRedisLock(String lockName,String lockValue) { // 1.计算获取锁的时间 Long endTime = System.currentTimeMillis() + acquireTimeout; // 2.尝试获取锁 while (System.currentTimeMillis() < endTime) { //3. 获取锁成功就设置过期时间 让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockName, lockValue, timeOut, TimeUnit.SECONDS); if (result) { return true; } } return false; } /** * 释放分布式锁 * @param lockName 锁名称 * @param lockValue 锁值 */ public void unRedisLock(String lockName,String lockValue) { if(lockValue.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(lockName))) { stringRedisTemplate.delete(lockName); } } }
@RestController public class RedisController { // 引入String类型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedisLock redisLock; @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } @RequestMapping("/deductStock") public String deductStock() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); try { boolean redisLock = this.redisLock.getRedisLock(key, lock_value);//获取锁 if (redisLock) { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } } finally { redisLock.unRedisLock(key,lock_value); //释放锁 } return "end"; } }
可以看到失败的线程不会直接结束,而是会尝试重试,一直到重试结束时间,才会结束
实际上这个最终版依然存在3个问题
1、在finally流程中,由于是先判断在处理。如果判断条件结束后,获取到的结果为true。但是在执行del操作前,此时jvm在执行GC操作(为了保证GC操作获取GC roots根完全,会暂停java程序),导致程序暂停。在GC操作完成并恢复后,执行del操作时,当前被加锁的key是否仍然存在?
2、问题如图所示
以上是Springboot整合Redis如何實現超賣問題的詳細內容。更多資訊請關注PHP中文網其他相關文章!