【自省】使用Executors.xxx违反阿里Java代码规范,那还不写定时任务了?

Source

一、背景

《分布式锁主动续期的入门级实现-自省 | 简约而不简单》中通过【自省】的方式讨论了关于分布式锁自动续期功能的入门级实现方式,简单同步一下上下文:

  1. 客户端抢到分布式锁之后开始执行任务,执行完毕后再释放分布式锁。
  2. 持锁后因客户端异常未能把锁释放,会导致锁成为永恒锁。
  3. 为了避免这种情况,在创建锁的时候给锁指定一个过期时间。
  4. 到期之后锁会被自动删除掉,这个角度看是对锁资源的一种保护。
  5. 重点:但若锁过期被删除后,任务还没结束怎么办?
  6. 可以通过在一个额外的线程中主动推迟分布式锁的过期时间,下文也用续期一词来表述;避免当任务还没执行完,锁就被删除了

二、理还乱?

逻辑看很简单,也很清晰,但任何事情都有两面性,每个锁配一个额外的线程做watchDog专门去处理,实现起来自然简单清晰,但肯定也有弊端。如果要把锁的功能做的健壮,总要从不断地自我质疑、自我反思中,理顺思路,寻找答案,我认为这属于自省式学习,以后也想尝试这种模式,一起来试试吧:

  • 问题:如果同时有成百上千个锁呢?

    同时有成百上千个锁,按照上篇中的实现方式,就会对应创建成百上千个线程在做续期工作,但实际上间歇性的续租操作并非高并发操作,只需要几个线程即可。类比一下一群羊只要少数牧羊犬看护的情景?

  • 问题:什么场景下会有同时出现这么多锁呢?

    如运营要做抢购活动,那么就会瞬间有成百上千的下单请求进入服务中,在高并发场景下特别容易出现超时而导致 rpc 重试 ,而这时需要拥有一种防重入的自保护机制的。对防重入感兴趣的这里提供一个直通车:《分布式锁中-基于 Redis 的实现如何防重入》

  • 问题:如何避免创建这么多线程呢?

    池化机制,Java 中提供了用于执行调度任务的线程池,如 ScheduledExecutorService#scheduleAtFixedRate

  • 问题:如何构建

    示例:

     new ScheduledThreadPoolExecutor(corePoolSize,
        new NamedThreadFactory("defaultKeepAlivePool-", true),
        new ThreadPoolExecutor.AbortPolicy()
    复制代码

    构造函数详情:

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }
    复制代码
  • 问题:线程池的构建居然敢用 Integer.MAX_VALUE

    这构造方法明显有问题,你该知道Integer.MAX_VALUE违反了阿里 Java 代码规范。

  • 问题:哪一条规范?

    看下边这条规范中,明确指出允许的请求队列长度为Integer.MAX_VALUE,可能会 xxx 导致 OOM

  • 问题:有Integer.MAX_VALUE就能 OOM 了?

    是的,交给线程池执行的是一个个任务对象,每个任务对象都会占用一定的内存,当线程池处理任务的能力降低,任务数越来越多的时候就 OOM 了。

  • 问题:能举个例子嘛?

    设置 JVM 内存-Xms200m -Xmx200m,JVM 内存上限设定小一些,每个任务里占用的内存给大一些,加速 OOM 报错。

    // 调整VM参数,加速OOM:-Xms200m -Xmx200m
    public static void testOOM(){
        ScheduledExecutorService scheduledExecutorService  =  Executors.newScheduledThreadPool(1);
        try {
            for(int i = 0;i<10000000;i++){
                scheduledExecutorService.scheduleAtFixedRate(() -> {
                    ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor)scheduledExecutorService;
                    int size = executor.getQueue().size();
                    System.out.println("size : " + size);
                    //为了快速复现,任务里给个大内存占用
                    byte[] array = new byte[1024*1024*10];
                    try {
                        TimeUnit.MINUTES.sleep(10);
                        int length  = array.length;
                        System.out.println(length);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                },5,5,TimeUnit.SECONDS);
            }
            TimeUnit.HOURS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    复制代码
  • 问题:结果啥样?

    果然模拟出了 OOM,结果如下:

    size : 1637400
    size : 1637778
    ...
    Exception in thread "main"
    java.lang.OutOfMemoryError: GC overhead limit exceeded
            at java.util.concurrent.Executors.callable(Executors.java:407)
            at java.util.concurrent.FutureTask.<init>(FutureTask.java:152)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.<init>(ScheduledThreadPoolExecutor.java:219)
            at java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:570)
            at TestSchedule.testOOM(TestSchedule.java:107
            at TestSchedule.main(TestSchedule.java:8)
    复制代码
  • 问题:那ScheduledThreadPool就不能用了?

    看场景,对于分布式锁看门口的场景下,一个系统中除非恶意写 bug;否则按照常规并发设置,如 Dubbo Provider 实例中,一般线程数是设置为 200,那并发情况下同时也就存在 200 个锁,从续期任务的提交看,最多只有 200 个,这个任务数,远远不到 Integer.MAX。

  • 问题:那要是就无限创建锁呢? 这种不讲场景的无限 xxx 的操作是 bug。很多操作被无限 xxx 后都能 OOM 。

  • 问题:那我只能用它了?

    从网络中搜集到的资料情况来看,ScheduledThreadPool最多,读者老师若熟悉其它的池化调度组件,也烦请留言告知。

  • 问题:虽然只提交了 200 个任务,但任务是定时触发的,这有风险的呀?

    定时重复执行的任务,如果每 5 秒执行一次,一个任务执行的耗时在 30 秒,那任务数就越来越多了。

  • 问题:定时任务数会因执行慢而越来越多嘛?

    public static void testInterval(){
    ScheduledExecutorService scheduledExecutorService =  Executors.newScheduledThreadPool(1);
    try {
        System.out.println("start : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                System.out.println("enter : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
                TimeUnit.SECONDS.sleep(10);
                System.out.println("exist : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },5,5,TimeUnit.SECONDS);
    
        TimeUnit.HOURS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    }
    复制代码

    任务没结束时,下个任务并未开始,而是等任务结束后才开始,那就没有了耗时大于定时间隔,而导致任务越来越多的风险了。

    start : 2022-12-06T17:33:39.177 // 任务开始
    enter : 2022-12-06T17:33:44.239 // 5秒后首次执行
    exist : 2022-12-06T17:33:54.239 // 任务耗时10秒
    enter : 2022-12-06T17:33:54.239 // 要求任务间隔是5秒,但已耗时10秒,超过了5秒,任务没结束时,下个任务并未开始,而是等任务结束后才开始
    exist : 2022-12-06T17:34:04.24
    enter : 2022-12-06T17:34:04.24
    exist : 2022-12-06T17:34:14.241
    enter : 2022-12-06T17:34:14.241
    exist : 2022-12-06T17:34:24.241
    复制代码
  • 问题:那任务执行耗时 小于 定时间隔的,什么时机开始下一次任务呢?

        public static void testInterval(){
            ScheduledExecutorService scheduledExecutorService =  Executors.newScheduledThreadPool(1);
            try {
                System.out.println("start : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
                scheduledExecutorService.scheduleAtFixedRate(() -> {
                    try {
                        System.out.println("enter : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
                        TimeUnit.SECONDS.sleep(5);
                        System.out.println("exist : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                },10,10,TimeUnit.SECONDS);
    
                TimeUnit.HOURS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        ```
    
        ```
        start : 2022-12-06T17:31:01.771 //任务开始
        enter : 2022-12-06T17:31:11.846 //10秒后首次执行
        exist : 2022-12-06T17:31:16.846 //任务耗时5秒
        enter : 2022-12-06T17:31:21.845 //因间隔10秒,任务耗时5秒,所以5秒后继续重新执行任务
        exist : 2022-12-06T17:31:26.846
        enter : 2022-12-06T17:31:31.846
        exist : 2022-12-06T17:31:36.846
        enter : 2022-12-06T17:31:41.847
        exist : 2022-12-06T17:31:46.847
    复制代码
  • 问题:所以scheduleAtFixedRate的逻辑结论是?

    • 如果上一个任务的执行时间大于等待时间,任务结束后,下一个任务马上执行。
    • 如果上一个任务的执行时间小于等待时间,任务结束后,下一个任务在(间隔时间-执行时间)后开始执行。
  • 问题:那 scheduleWithFixedDealy 呢?

    • 如果上个任务的执行时间大于等待时间,任务结束后也会等待相应的时间才执行下一个任务

三、新的思考

  • 问题:那阿里的规范写的有问题?

    可再认真阅读此规范,微妙之处请读者老师自行品鉴,也可留言讨论

  • 问题:那是不是就可以放心大胆的霍霍了?

    那肯定是不能随心所欲的。

  • 问题:那还有什么注意事项?

    还不累嘛?休息休息,日更该发稿了,要不审核就下班了。

  • 问题:掘金审核还下班?

    好问题,审核是什么机制呢,欢迎留言讨论,咱们下一篇再聊。


四、最后说一句

我是石页兄,如果这篇文章对您有帮助,或者有所启发的话,欢迎关注笔者的微信公众号【 架构染色 】进行交流和学习。您的支持是我坚持写作最大的动力。