模拟秒杀场景,参考以下博客完成:
项目用到的技术如下:
项目目录如图所示:
数据库表:
CREATE TABLE `stock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(50) NOT NULL DEFAULT '' COMMENT '名称',
`count` int(11) NOT NULL COMMENT '库存',
`sale` int(11) NOT NULL COMMENT '已售',
`version` int(11) NOT NULL COMMENT '乐观锁,版本号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
CREATE TABLE `stock_order` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`sid` int(11) NOT NULL COMMENT '库存ID',
`name` varchar(30) NOT NULL DEFAULT '' COMMENT '商品名称',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=55 DEFAULT CHARSET=utf8;
库存stock表插入一条数据:
INSERT INTO `kill-sale`.`stock` (`id`, `name`, `count`, `sale`, `version`) VALUES ('1', 'java编程思想', '10', '0', '0');
service层主要逻辑:
/**
* 下单流程
* @param stockId
* @return
*/
public ResultObj createOrder(Integer stockId) {
ResultObj result = ResultObj.getFail();
Stock stock = stockMapper.selectByPrimaryKey(stockId);
//校验库存
result = checkStock(stock);
if(ResultObj.succCode!=result.getCode()){
return result;
}
//减库存
result = deductStock(stock);
if(ResultObj.succCode!=result.getCode()){
return result;
}
//创建订单
createStockOrder(stock);
return result;
}
/**
* 校验库存是否充足
* @param stock
* @return
*/
private ResultObj checkStock(Stock stock) {
ResultObj result = ResultObj.getSuccess();
if(stock==null){
return new ResultObj(OrderResult.STOCK_EMPTY.getCode(),OrderResult.STOCK_EMPTY.getMsg());
}
if(stock.getSale()>=stock.getCount()){
return new ResultObj(OrderResult.STOCK_EMPTY.getCode(),OrderResult.STOCK_EMPTY.getMsg());
}
return result;
}
/**
* 扣库存
* @param stock
* @return
*/
private ResultObj deductStock(Stock stock) {
stock.setSale(stock.getSale()+1);
int i = stockMapper.updateByPrimaryKeySelective(stock);
if(i<=0){
return new ResultObj(OrderResult.UPDATE_STOCK_FAIL.getCode(),OrderResult.UPDATE_STOCK_FAIL.getMsg());
}
return ResultObj.getSuccess();
}
/**
* 创建订单
* @param stock
* @return
*/
private void createStockOrder(Stock stock) {
StockOrder order = new StockOrder();
order.setName(stock.getName());
order.setSid(stock.getId());
order.setCreateTime(new Date());
int i = stockOrderMapper.insert(order);
if(i<=0){
throw new RuntimeException("创建订单失败!"); //事务回滚
}
}
controller层逻辑如下:
@Resource
private OrderService orderService;
@GetMapping("/order/{sid}")
public Object order(@PathVariable Integer sid){
ResultObj result = ResultObj.getFail();
try{
result = orderService.createOrder(sid);
logger.info("下单结果:code="+result.getCode()+",msg="+result.getMsg());
}catch(Exception e){
logger.error("下单失败",e);
result.setMsg(e.toString());
}
return result;
}
用jmeter测试: 100个线程,5个循环测试
数据库结果如下:
我们发现这时库存表stock销售数目为10,而订单表stock-order订单数目为489条,这时出现了超卖。
优化一:使用乐观锁防超卖 使用stock表的version字段做版本控制,每次减库存时判断当前版本号是否跟查询出来的版本号一致 实现如下,修改如下: orderServiceImpl:
/**
* 扣库存
* @param stock
* @return
*/
private ResultObj deductStock(Stock stock) {
//这种情况会出现超卖现象
//stock.setSale(stock.getSale()+1);
//int i = stockMapper.updateByPrimaryKeySelective(stock);
//优化一:使用乐观锁(版本号)方式防止超卖
int i = stockMapper.decreaseStockOptimistic(stock);
if(i<=0){
return new ResultObj(OrderResult.UPDATE_STOCK_FAIL.getCode(),OrderResult.UPDATE_STOCK_FAIL.getMsg());
}
return ResultObj.getSuccess();
}
StockMapper:
<update id="decreaseStockOptimistic" parameterType="com.lyang.mall.api.order.entity.Stock">
update stock
set sale = #{sale}+1,
version = #{version}+1
where id = #{id,jdbcType=INTEGER} and version = #{version,jdbcType=INTEGER}
</update>
数据库数据还原后再次用使用jmeter测试,结果如下:
此时10个库存卖完,创建10笔订单,解决了超卖问题!
此时,如果我们把数据库还原后,将库存数改为1000,用并发数100测试,10个循环测试。 数据库结果为:
咱们发现没有出现超卖现象,但是1000的库存只卖了34个,这样问题又来了,由于乐观锁版本控制,每次更新时都要比较版本号,在高并发的情况下,会出现大量的扣库存失败情况,导致的结果就是很多想买的客户买不到。
优化二:乐观锁改良(只要不超卖就ok) stockMapper.xml:
<update id="decreaseStockOptimistic" parameterType="com.lyang.mall.api.order.entity.Stock">
update stock
set sale = sale + 1
where id = #{id,jdbcType=INTEGER} and sale < count
</update>
注意sql中必须是sale<count或者sale+1<=count,否则会超卖一件。 还原数据库再次测试:
库存全部卖完并且没有超卖现象。
咱们再设想下如果产品库存不多,比如10个,而购买人特别多,比如1000,其实大多数的购买请求都是不会成功的,而这些请求却会增加服务器压力。 我们把库存改为10,用jmeter测试,线程数100,10个循环 我们查看druid的监控页面
我们发现查询次数1000次,减库存更新操作100次,更新成功的10次,创建订单10次,在库存远小于请求购买量的情况下,很多的请求都是不会成功的,但是这些请求却给服务器造成很大压力。
优化三:限流 限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务。 我们先限制能访问的请求数,使用AtomicInteger实现。
OrderController:
private final AtomicInteger count = new AtomicInteger(1);
@Resource
private OrderService orderService;
@GetMapping("/order/{sid}")
public Object order(@PathVariable Integer sid){
ResultObj result = ResultObj.getFail();
try{
if(count.incrementAndGet()>300){
result.setMsg("请求太多了,就不接待你了。。。");
return result;
}
System.out.println(count);
result = orderService.createOrder(sid);
logger.info("下单结果:code="+result.getCode()+",msg="+result.getMsg());
}catch(Exception e){
logger.error("下单失败",e);
result.setMsg(e.toString());
}
return result;
}
数据库还原,druid moniter清空数据,jmeter还是用上面的参数进行测试,测试结果如下:
此时查询次数只有299次,减库存更新98次,执行成功10次,创建订单10次,我们发现数据库操作比之前少了很多,所以数据库压力得到有效降低。这次限制的次数视情况而定,理论上说只要能满足将库存卖完,该值越小越好。 另外我们还可以使用Semaphore来限制并发数。
private final Semaphore permit = new Semaphore(10, true);
try{
permit.acquire();
result = orderService.createOrder(sid);
logger.info("下单结果:code="+result.getCode()+",msg="+result.getMsg());
}catch(Exception e){
logger.error("下单失败",e);
result.setMsg(e.toString());
}finally{
permit.release();
}
我们发现使用AtomicInteger限制请求数或者Semaphore限制并发数都只适合单体应用(如果是分布式,每个节点用AtomicInteger和Semaphore控制,也是有效果的,但是不是全局的),如果我们的服务是部署的分布式呢?怎么来全局控制限流呢?
优化三:分布式限流
将mall-web和mall-biz-order分别部署到两台机器,细节就不说明了。 这里参考博客博客地址通过redis来实现,这里也不再细说。
另外我们使用的rpc框架dubbo也是可以做限流的,dubbo用户手册地址。 另外还有Nginx接入层限流,按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流,我们在下一篇文章中详细介绍。
优化四:库存放入redis缓存中
由于每次调用下单接口,第一步都是查询库存,会执行一条select语句,这里我们将从数据库查询库存优化成从redis中查询库存,减小数据库压力。 OrderServiceImpl.java
@Override
public ResultObj createOrder(Integer stockId) {
Stock stock = getStockByRedis(stockId);
//校验库存
ResultObj result = checkStock(stock);
if(ResultObj.succCode!=result.getCode()){
return result;
}
//减库存
result = deductStockByRedis(stock);
if(ResultObj.succCode!=result.getCode()){
return result;
}
//创建订单
createStockOrder(stock);
return result;
}
private ResultObj deductStockByRedis(Stock stock) {
//这里还是使用数据库的乐观锁来控制,防止超卖,能不能利用redis控制呢?
int i = stockMapper.decreaseStockOptimisticInc(stock.getId());
if(i<=0){
return new ResultObj(OrderResult.UPDATE_STOCK_FAIL.getCode(),OrderResult.UPDATE_STOCK_FAIL.getMsg());
}
//更新redis缓存数据
RedisUtil.hincrBy(OrderConstant.REDIS_KEY_STOCK + stock.getId(), "sale", 1);
return ResultObj.getSuccess();
}
private Stock getStockByRedis(Integer stockId) {
Map<String,String> cacheDatas = RedisUtil.hgetall(OrderConstant.REDIS_KEY_STOCK+stockId);
if(cacheDatas==null || cacheDatas.isEmpty()){
throw new RuntimeException("缓存数据为空");
}
//库存总数量
Integer count = Integer.parseInt(cacheDatas.get("count"));
//已销售数量
Integer sale = Integer.parseInt(cacheDatas.get("sale"));
//名称
String name = cacheDatas.get("name");
//版本号
Integer version = Integer.parseInt(cacheDatas.get("version"));
Stock stock = new Stock();
stock.setId(stockId);
stock.setCount(count);
stock.setSale(sale);
stock.setName(name);
stock.setVersion(version);
return stock;
}
然后我们使用监听器,在服务启动的时候讲数据库库存查询出来放到redis中
@WebListener
public class StockDataToRedisListener implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
ServletContext servletContext = servletContextEvent.getServletContext();
WebApplicationContext applicationContext = WebApplicationContextUtils.getWebApplicationContext(servletContext);
StockMapper stockMapper = (StockMapper)applicationContext.getBean(StockMapper.class);
List<Stock> stockList = stockMapper.selectByExample(new StockExample());
if(!CollectionUtils.isEmpty(stockList)){
for(Stock stock : stockList){
//Map<String,Object> cacheData = BeanMapConvertUtil.beanToMap(stock);
RedisUtil.hset(OrderConstant.REDIS_KEY_STOCK+stock.getId(), "count",String.valueOf(stock.getCount()));
RedisUtil.hset(OrderConstant.REDIS_KEY_STOCK+stock.getId(), "sale",String.valueOf(stock.getSale()));
RedisUtil.hset(OrderConstant.REDIS_KEY_STOCK+stock.getId(), "name",String.valueOf(stock.getName()));
RedisUtil.hset(OrderConstant.REDIS_KEY_STOCK+stock.getId(),
"version",String.valueOf(stock.getVersion()));
}
}
}
@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
}
}
再次测试,将库存设为10,100个线程10轮测试: 数据库没有出现超卖现象,数据准确,看下druid monitor.
我们发现只有一条查询sql,就是服务启动时查询库存放入redis中的sql查询。ok。
优化五:使用rabbitMq解耦
上面的项目中,我们在扣完库存后,直接调用创建订单的方法,在实际秒杀场景中,后面的创建订单过程是比较复杂的,如果能将创建订单的逻辑解耦到另外一个服务中,对减小当前秒杀服务的压力是有显著效果的,并且客户端秒杀响应更快,体验很好,也可以起到流量削峰的作用。所以我们这里使用rabbitmq解耦。 新的系统架构如下:
mall-rabbit-consumer即为消费服务端。
OrderServiceImpl.java
@Override
public ResultObj createOrder(Integer stockId) {
Stock stock = getStockByRedis(stockId);
//校验库存
ResultObj result = checkStock(stock);
if(ResultObj.succCode!=result.getCode()){
return result;
}
//减库存
result = deductStockByRedis(stock);
if(ResultObj.succCode!=result.getCode()){
return result;
}
//创建订单
//createStockOrder(stock);
//使用rabbitmq解耦,流量削峰
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_ORDER, RabbitMQConfig.ROUNTING_KEY_ORDER,
SerializationUtils.serialize(stock));
return result;
}
mall-rabbit-consumer服务中的OrderConsumer.java
@Component
public class OrderConsumer {
private static final Logger logger = LoggerFactory.getLogger(OrderConstant.class);
@Resource
private OrderService orderService;
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME_ORDER)
public String processMessage(byte[] msg) {
try{
System.out.println(Thread.currentThread().getName() +
" 接收到来自"+RabbitMQConfig.QUEUE_NAME_ORDER+"队列的消息:" + msg);
Stock stock = (Stock) SerializationUtils.deserialize(msg);
//由于创建订单的服务仍是mall-biz-order,这里实现一个折中方式,休眠一分钟,避开秒杀的高峰期
Thread.currentThread().sleep(60*1000);
orderService.createStockOrder(stock);
}catch(Exception e){
logger.error("消费订单失败",e);
}
return null;
}
}
rabbitmq这里只实现了最简单的功能,一些其他功能比如消息确认都没实现,只是为了说明解耦的功能。
至此,整个模拟秒杀的项目完成,若有不足的地方,望多提意见,感谢。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。