먼저 최종 아키텍처 다이어그램을 살펴보겠습니다.
이 다이어그램을 기반으로 요청 흐름에 대해 간략하게 설명하겠습니다. 나중에 어떻게 개선하더라도 변경되지 않을 것이기 때문입니다.
The 프런트 엔드 요청은 웹 계층에 들어가고 해당 코드는 컨트롤러입니다.
그런 다음 실제 재고 확인, 주문 배치 및 기타 요청이 서비스 계층으로 전송됩니다. RPC 호출은 여전히 Dubbo를 사용하지만 *** 버전으로 업데이트됩니다.
그런 다음 서비스 계층이 데이터를 구현하고 주문이 이루어집니다. flash 플래시 판매 시나리오를 제쳐두고 정상 주문 프로세스를 다음 단계로 간단히 나눌 수 있습니다.
Payment
위 아키텍처를 기반으로 다음과 같은 구현이 있습니다. 먼저 실제 프로젝트 구조를 살펴보겠습니다.
여전히 이전과 동일 :
웹 계층은 단순히 Spring MVC입니다.
서비스 계층은 실제 데이터 구현입니다.
데이터베이스에는 주문을 시뮬레이션하는 두 개의 간단한 테이블만 있습니다.
CREATE TABLE `stock` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(50) NOT NULL DEFAULT '' COMMENT '名称', `count` int(11) NOT NULL COMMENT '库存', `sale` int(11) NOT NULL COMMENT '已售', `version` int(11) NOT NULL COMMENT '乐观锁,版本号', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; CREATE TABLE `stock_order` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `sid` int(11) NOT NULL COMMENT '库存ID', `name` varchar(30) NOT NULL DEFAULT '' COMMENT '商品名称', `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=55 DEFAULT CHARSET=utf8;
웹 레이어 컨트롤러 구현:
@Autowired private StockService stockService; @Autowired private OrderService orderService; @RequestMapping("/createWrongOrder/{sid}") @ResponseBody public String createWrongOrder(@PathVariable int sid) { logger.info("sid=[{}]", sid); int id = 0; try { id = orderService.createWrongOrder(sid); } catch (Exception e) { logger.error("Exception",e); } return String.valueOf(id); }
서비스 레이어, OrderService 구현, 우선 API 구현(출력 인터페이스는 API에서 제공됨):
@Service public class OrderServiceImpl implements OrderService { @Resource(name = "DBOrderService") private com.crossoverJie.seconds.kill.service.OrderService orderService ; @Override public int createWrongOrder(int sid) throws Exception { return orderService.createWrongOrder(sid); } }
DBOrderService 구현:
Transactional(rollbackFor = Exception.class) @Service(value = "DBOrderService") public class OrderServiceImpl implements OrderService { @Resource(name = "DBStockService") private com.crossoverJie.seconds.kill.service.StockService stockService; @Autowired private StockOrderMapper orderMapper; @Override public int createWrongOrder(int sid) throws Exception{ //校验库存 Stock stock = checkStock(sid); //扣库存 saleStock(stock); //创建订单 int id = createOrder(stock); return id; } private Stock checkStock(int sid) { Stock stock = stockService.getStockById(sid); if (stock.getSale().equals(stock.getCount())) { throw new RuntimeException("库存不足"); } return stock; } private int saleStock(Stock stock) { stock.setSale(stock.getSale() + 1); return stockService.updateStockById(stock); } private int createOrder(Stock stock) { StockOrder order = new StockOrder(); order.setSid(stock.getId()); order.setName(stock.getName()); int id = orderMapper.insertSelective(order); return id; } }
Order table
모든 것이 괜찮아 보이고 데이터도 정상입니다. 그러나 동시성 테스트를 위해 JMeter를 사용하는 경우:
테스트 구성은 동시에 300개 스레드입니다. 두 차례의 테스트 후 데이터베이스에서 결과를 살펴보겠습니다. 요청이 모두 성공적으로 응답되었고 실제로 재고가 차감되었지만 주문에 대해 124개의 레코드가 생성되었습니다. . 이는 분명히 전형적인 과매도 현상입니다. 사실 지금 인터페이스를 수동으로 호출하면 부족한 재고를 돌려주는데 너무 늦습니다.Optimistic lock update
위의 현상을 피하는 방법은? 가장 간단한 방법은 자연스러운 낙관적 잠금입니다. 구체적인 구현을 살펴보겠습니다.
사실, 주로 서비스 계층이 변경되었습니다.
@Override public int createOptimisticOrder(int sid) throws Exception { //校验库存 Stock stock = checkStock(sid); //乐观锁更新库存 saleStockOptimistic(stock); //创建订单 int id = createOrder(stock); return id; } private void saleStockOptimistic(Stock stock) { int count = stockService.updateStockByOptimistic(stock); if (count == 0){ throw new RuntimeException("并发更新库存失败") ; } }
<update> update stock <set> sale = sale + 1, version = version + 1, </set> WHERE id = #{id,jdbcType=INTEGER} AND version = #{version,jdbcType=INTEGER} </update>
로그를 확인하고 다음을 찾으세요.
많은 동시 요청이 오류로 응답하여 효과를 얻습니다.
처리량 향상플래시 세일 중 처리량과 응답 효율성을 더욱 향상시키기 위해 여기의 웹과 서비스가 모두 수평으로 확장되었습니다.
Web은 로드에 Nginx를 사용합니다.
서비스에도 다양한 애플리케이션이 있습니다.再用 JMeter 测试时可以直观的看到效果。
由于我是在阿里云的一台小水管服务器进行测试的,加上配置不高、应用都在同一台,所以并没有完全体现出性能上的优势( Nginx 做负载转发时候也会增加额外的网络消耗)。
Shell 脚本实现简单的 CI
由于应用多台部署之后,手动发版测试的痛苦相信经历过的都有体会。
这次并没有精力去搭建完整的 CICD,只是写了一个简单的脚本实现了自动化部署,希望给这方面没有经验的同学带来一点启发。
构建 Web:
#!/bin/bash # 构建 web 消费者 #read appname appname="consumer" echo "input="$appname PID=$(ps -ef | grep $appname | grep -v grep | awk '{print $2}') # 遍历杀掉 pid for var in ${PID[@]}; do echo "loop pid= $var" kill -9 $var done echo "kill $appname success" cd .. git pull cd SSM-SECONDS-KILL mvn -Dmaven.test.skip=true clean package echo "build war success" cp /home/crossoverJie/SSM/SSM-SECONDS-KILL/SSM-SECONDS-KILL-WEB/target/SSM-SECONDS-KILL-WEB-2.2.0-SNAPSHOT.war /home/crossoverJie/tomcat/tomcat-dubbo-consumer-8083/webapps echo "cp tomcat-dubbo-consumer-8083/webapps ok!" cp /home/crossoverJie/SSM/SSM-SECONDS-KILL/SSM-SECONDS-KILL-WEB/target/SSM-SECONDS-KILL-WEB-2.2.0-SNAPSHOT.war /home/crossoverJie/tomcat/tomcat-dubbo-consumer-7083-slave/webapps echo "cp tomcat-dubbo-consumer-7083-slave/webapps ok!" sh /home/crossoverJie/tomcat/tomcat-dubbo-consumer-8083/bin/startup.sh echo "tomcat-dubbo-consumer-8083/bin/startup.sh success" sh /home/crossoverJie/tomcat/tomcat-dubbo-consumer-7083-slave/bin/startup.sh echo "tomcat-dubbo-consumer-7083-slave/bin/startup.sh success" echo "start $appname success"
构建 Service:
# 构建服务提供者 #read appname appname="provider" echo "input="$appname PID=$(ps -ef | grep $appname | grep -v grep | awk '{print $2}') #if [ $? -eq 0 ]; then # echo "process id:$PID" #else # echo "process $appname not exit" # exit #fi # 遍历杀掉 pid for var in ${PID[@]}; do echo "loop pid= $var" kill -9 $var done echo "kill $appname success" cd .. git pull cd SSM-SECONDS-KILL mvn -Dmaven.test.skip=true clean package echo "build war success" cp /home/crossoverJie/SSM/SSM-SECONDS-KILL/SSM-SECONDS-KILL-SERVICE/target/SSM-SECONDS-KILL-SERVICE-2.2.0-SNAPSHOT.war /home/crossoverJie/tomcat/tomcat-dubbo-provider-8080/webapps echo "cp tomcat-dubbo-provider-8080/webapps ok!" cp /home/crossoverJie/SSM/SSM-SECONDS-KILL/SSM-SECONDS-KILL-SERVICE/target/SSM-SECONDS-KILL-SERVICE-2.2.0-SNAPSHOT.war /home/crossoverJie/tomcat/tomcat-dubbo-provider-7080-slave/webapps echo "cp tomcat-dubbo-provider-7080-slave/webapps ok!" sh /home/crossoverJie/tomcat/tomcat-dubbo-provider-8080/bin/startup.sh echo "tomcat-dubbo-provider-8080/bin/startup.sh success" sh /home/crossoverJie/tomcat/tomcat-dubbo-provider-7080-slave/bin/startup.sh echo "tomcat-dubbo-provider-8080/bin/startup.sh success" echo "start $appname success"
之后每当我有更新,只需要执行这两个脚本就可以帮我自动构建。都是最基础的 Linux 命令,相信大家都看得明白。
乐观锁更新 + 分布式限流
上文的结果看似没有问题,其实还差得远呢。这里只是模拟了 300 个并发没有问题,但是当请求达到了 3000,3W,300W 呢?
虽说可以横向扩展支撑更多的请求,但是能不能利用最少的资源解决问题呢?
仔细分析下会发现:假设我的商品一共只有 10 个库存,那么无论你多少人来买其实最终也最多只有 10 人可以下单成功。所以其中会有 99% 的请求都是无效的。
大家都知道:大多数应用数据库都是压倒骆驼的***一根稻草。通过 Druid 的监控来看看之前请求数据库的情况:
因为 Service 是两个应用:
数据库也有 20 多个连接。怎么样来优化呢?其实很容易想到的就是分布式限流。
我们将并发控制在一个可控的范围之内,然后快速失败这样就能***程度的保护系统。
①distributed-redis-tool ⬆v1.0.3
因为加上该组件之后所有的请求都会经过 Redis,所以对 Redis 资源的使用也是要非常小心。
②API 更新
修改之后的 API 如下:
@Configuration public class RedisLimitConfig { private Logger logger = LoggerFactory.getLogger(RedisLimitConfig.class); @Value("${redis.limit}") private int limit; @Autowired private JedisConnectionFactory jedisConnectionFactory; @Bean public RedisLimit build() { RedisLimit redisLimit = new RedisLimit.Builder(jedisConnectionFactory, RedisToolsConstant.SINGLE) .limit(limit) .build(); return redisLimit; } }
这里构建器改用了 JedisConnectionFactory,所以得配合 Spring 来一起使用。
并在初始化时显示传入 Redis 是以集群方式部署还是单机(强烈建议集群,限流之后对 Redis 还是有一定的压力)。
③限流实现
既然 API 更新了,实现自然也要修改:
/** * limit traffic * @return if true */ public boolean limit() { //get connection Object connection = getConnection(); Object result = limitRequest(connection); if (FAIL_CODE != (Long) result) { return true; } else { return false; } } private Object limitRequest(Object connection) { Object result = null; String key = String.valueOf(System.currentTimeMillis() / 1000); if (connection instanceof Jedis){ result = ((Jedis)connection).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit))); ((Jedis) connection).close(); }else { result = ((JedisCluster) connection).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit))); try { ((JedisCluster) connection).close(); } catch (IOException e) { logger.error("IOException",e); } } return result; } private Object getConnection() { Object connection ; if (type == RedisToolsConstant.SINGLE){ RedisConnection redisConnection = jedisConnectionFactory.getConnection(); connection = redisConnection.getNativeConnection(); }else { RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection(); connection = clusterConnection.getNativeConnection() ; } return connection; }
如果是原生的 Spring 应用得采用 @SpringControllerLimit(errorCode=200) 注解。
实际使用如下,Web 端:
/** * 乐观锁更新库存 限流 * @param sid * @return */ @SpringControllerLimit(errorCode = 200) @RequestMapping("/createOptimisticLimitOrder/{sid}") @ResponseBody public String createOptimisticLimitOrder(@PathVariable int sid) { logger.info("sid=[{}]", sid); int id = 0; try { id = orderService.createOptimisticOrder(sid); } catch (Exception e) { logger.error("Exception",e); } return String.valueOf(id); }
Service 端就没什么更新了,依然是采用的乐观锁更新数据库。
再压测看下效果 /createOptimisticLimitOrderByRedis/1:
首先是看结果没有问题,再看数据库连接以及并发请求数都有明显的下降。
乐观锁更新+分布式限流+Redis 缓存
仔细观察 Druid 监控数据发现这个 SQL 被多次查询:
其实这是实时查询库存的 SQL,主要是为了在每次下单之前判断是否还有库存。
这也是个优化点。这种数据我们完全可以放在内存中,效率比在数据库要高很多。
由于我们的应用是分布式的,所以堆内缓存显然不合适,Redis 就非常适合。
这次主要改造的是 Service 层:
每次查询库存时走 Redis。
扣库存时更新 Redis。
需要提前将库存信息写入 Redis。(手动或者程序自动都可以)
主要代码如下:
@Override public int createOptimisticOrderUseRedis(int sid) throws Exception { //检验库存,从 Redis 获取 Stock stock = checkStockByRedis(sid); //乐观锁更新库存 以及更新 Redis saleStockOptimisticByRedis(stock); //创建订单 int id = createOrder(stock); return id ; } private Stock checkStockByRedis(int sid) throws Exception { Integer count = Integer.parseInt(redisTemplate.opsForValue().get(RedisKeysConstant.STOCK_COUNT + sid)); Integer sale = Integer.parseInt(redisTemplate.opsForValue().get(RedisKeysConstant.STOCK_SALE + sid)); if (count.equals(sale)){ throw new RuntimeException("库存不足 Redis currentCount=" + sale); } Integer version = Integer.parseInt(redisTemplate.opsForValue().get(RedisKeysConstant.STOCK_VERSION + sid)); Stock stock = new Stock() ; stock.setId(sid); stock.setCount(count); stock.setSale(sale); stock.setVersion(version); return stock; } /** * 乐观锁更新数据库 还要更新 Redis * @param stock */ private void saleStockOptimisticByRedis(Stock stock) { int count = stockService.updateStockByOptimistic(stock); if (count == 0){ throw new RuntimeException("并发更新库存失败") ; } //自增 redisTemplate.opsForValue().increment(RedisKeysConstant.STOCK_SALE + stock.getId(),1) ; redisTemplate.opsForValue().increment(RedisKeysConstant.STOCK_VERSION + stock.getId(),1) ; }
压测看看实际效果 /createOptimisticLimitOrderByRedis/1:
***发现数据没问题,数据库的请求与并发也都下来了。
乐观锁更新+分布式限流+Redis 缓存+Kafka 异步
***的优化还是想如何来再次提高吞吐量以及性能的。我们上文所有例子其实都是同步请求,完全可以利用同步转异步来提高性能啊。
这里我们将写订单以及更新库存的操作进行异步化,利用 Kafka 来进行解耦和队列的作用。
每当一个请求通过了限流到达了 Service 层通过了库存校验之后就将订单信息发给 Kafka ,这样一个请求就可以直接返回了。
消费程序再对数据进行入库落地。因为异步了,所以最终需要采取回调或者是其他提醒的方式提醒用户购买完成。
这里代码较多就不贴了,消费程序其实就是把之前的 Service 层的逻辑重写了一遍,不过采用的是 Spring Boot。
위 내용은 Java를 사용하여 효율적인 플래시 세일 시스템을 구현하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!