1. 什么是DelayQueue
DelayQueue是JDK concurrent包下提供的一个类,实现了Queue接口。其本质是一个队列数据结构。
DelayQueue的元素必须是Delayed接口,该接口继承Comparable。
接口提供getDelay的方法,返回延时剩余时间,当返回值为0时,才能取出元素。
2. DelayQueue能用在什么地方
DelayQueue一般用于生产者消费者模式,典型案例就是订单系统中的超时支付。
3. 最佳实践
使用DelayQueue给订单超时支付进行一个建模。首先,在订单中定义一个超时时间,这个在创建订单时就应该确定。
并且在创建订单后,将订单加入一个DelayQueue,之后程序循环从队列取出订单执行关单处理的逻辑。
3.1 创建一个数据库实体类
实体类需要实现Delayed接口,重写getDelay方法,返回订单的过期时间减去当前系统时间来确定订单是否过期,也就是能否从队列中取出。
以下代码使用了mybatis-plus和lombok依赖。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Data @TableName("`order`") public class Order implements Delayed { @TableId private String id; private String orderStatus; private Date expiredTime;
@Override public long getDelay(TimeUnit unit) { return this.expiredTime.getTime() - System.currentTimeMillis(); }
@Override public int compareTo(Delayed o) { if (this == o) { return 0; } Order order = (Order) o; long time = this.expiredTime.getTime() - order.getExpiredTime().getTime(); return time == 0 ? 0 : time < 0 ? -1 : 1; } }
|
3.2 定义订单服务
由于循环从队列取出的任务必须在后台执行,所以在订单服务中需使用到线程池,并将这个任务提交给线程池处理。
由于这一任务在应用关闭后就会停止,所以在应用启动时,就应该将数据中的订单数据读出并加入队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Slf4j @Service public class OrderService implements InitializingBean { @Autowired private OrderMapper orderMapper; private final DelayQueue<Order> delayQueue = new DelayQueue<>(); private final AtomicBoolean start = new AtomicBoolean(false); private final ExecutorService executorService = new ThreadPoolExecutor( 3, 6, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() );
@Override public void afterPropertiesSet() { start(); }
public void start() { if (start.get()) { return; } start.set(true); List<Order> list = orderMapper.selectList(Wrappers.lambdaQuery(Order.class) .eq(Order::getOrderStatus, "0")); delayQueue.addAll(list); executorService.submit(() -> { while (start.get()) { try { Order order = delayQueue.take(); orderMapper.update(null, Wrappers.lambdaUpdate(Order.class) .set(Order::getOrderStatus, "1") .eq(Order::getId, order.getId())); log.info("订单id={},已过期", order.getId()); } catch (Exception e) { e.printStackTrace(); } } }); } }
|