多线程超时中断

在Java的多线程环境下,如果主线程最多等待子线程执行指定时间就要返回,保证不会因为执行时间过长而长时间没有响应。

一般有这么几种途径,线程本身中断,线程池超时中断,计数器等等,有如下几种方式:

  • Thread.join()
  • 守护线程
  • 线程池+FutureTask
  • CountDownLatch.await()
    上面列出的这几种方案,都带了超时退出机制,下面我们来对比一下这几种方案

线程join

join()Thread类中的一个方法,该方法有1个参数millis,该参数表示最多等待millis之后线程死亡。

要注意,在线程A中获得到线程B的对象,是在线程A中执行的线程B的join()方法,所以也是线程A循环执行wait()等待B线程的。

下面我们来看一下源码,看看join()方法是如何执行的

    public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
 
        //0表示永久等待
        if (millis == 0) {
           //isAlive()判断当前线程是否存活
            while (isAlive()) {
                //Object.wait()方法等待获取对象锁
                wait(0);
            }
        } else {
            //isAlive()判断当前线程是否存活
            while (isAlive()) {
                long delay = millis - now;
                //超时退出循环
                if (delay <= 0) {
                    break;
                }
                //Object.wait()方法等待获取对象锁
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

isAlive()Thread类中定义的方法,是一个native方法,用于判断当前线程是否存活

    /**
     * Tests if this thread is alive. A thread is alive if it has
     * been started and has not yet died.
     *
     * @return  <code>true</code> if this thread is alive;
     *          <code>false</code> otherwise.
     */
    public final native boolean isAlive();

wait()Object类中的方法,用于等待获取对象锁,wait()/notify()是常用的等待通知机制

    public final native void wait(long timeout) throws InterruptedException;

join()方法比较简单,就是循环判断线程是否存活,执行等待方法,等待超时或者中断。

下面我们来写一个简单示例,配合debug,来看一下执行的过程

public class JoinTest {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("主线程开始执行:"+Thread.currentThread().getName());
        
        Thread t = new Thread(() -> {
            long begin = System.currentTimeMillis();
            
            System.out.println("开始执行子线程:" + Thread.currentThread().getName());
            
            try {
                Thread.sleep(10000);
                System.out.println("子线程执行耗时:"+  (System.currentTimeMillis()-begin));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        );
        
        t.start();
        
        long joinBegin = System.currentTimeMillis();
        t.join(1000);
        System.out.println("join()执行耗时:"+(System.currentTimeMillis()-joinBegin));
        
        System.out.println("主线程执行结束:"+Thread.currentThread().getName());

    }
}

以上的代码就是简单的一个main()方法,在main()里另起一个子线程,子线程睡10000ms,执行t.join(),等待时间是1000ms。

示例输出结果如下

主线程开始执行:main
开始执行子线程:Thread-0
join()执行耗时:1006
主线程执行结束:main
子线程执行耗时:10010

Process finished with exit code 0

可以看出主线程在等待1000ms之后就不阻塞继续往下执行了,而子线程还在继续执行,所以join()会让主线程退出阻塞状态,而不会使子线程结束。

下面我们采用debug模式看一下执行过程中的线程情况。

我们在join()方法里,判断delay<=0这个地方打一个条件断点,当delay<=0的时候停在断点处。

image.png

断点停住,然后现在的线程是main,即是我们的主线程在执行join()

image.png

也就是说主线程执行时间超过等待时间之后,会结束循环,继续执行。

而我们的子线程现在的状态还是SLEEPING,等睡眠时间到了才会继续执行

image.png

t.join()是由主线程执行的。

main线程获得了Thread-0的线程对象,然后main线程去循环等待,线程阻塞住,直至超过等待时间,main线程退出阻塞,继续往下执行。

join()符合我们的需求,即最多等待子线程执行多长时间返回。不过join()一般来说使用不是太多,大家可能对join()不太熟悉。

守护线程

JVM判断是否退出的标志是是否存在非守护线程存活,如果只剩下守护线程存活,那么JVM就可以结束退出了。

我们可以使用守护线程监视一个业务线程,让守护线程睡一定的时间,睡醒后中断监视的线程,这样就可以达到超时退出的目的。

使用线程中断退出的精度不高,中断只是设置一个中断标志,线程并不会立即结束。

public class DaemonThread {

    static class Task implements Runnable {
        private final String name;
        private final int time;

        public Task(String s, int t) {
            name = s;
            time = t;
        }

        public int getTime() {
            return time;
        }

        @Override
        public void run() {
            for (int i = 0; i < time; ++i) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.println(name
                            + " is interrupted when calculating, will stop...");
                    return; // 注意这里如果不return的话,线程还会继续执行,所以任务超时后在这里处理结果然后返回
                }
                System.out.println("task " + name + " " + (i + 1) + " round");
            }
            System.out.println("task " + name + " finished successfully");
        }
    }

    static class Daemon implements Runnable {
        List<Runnable> tasks = new ArrayList<Runnable>();
        private final Thread thread;
        private final int time;

        public Daemon(Thread r, int t) {
            thread = r;
            time = t;
        }

        public void addTask(Runnable r) {
            tasks.add(r);
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(time * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                thread.interrupt();
            }
        }
    }

    public static void main(String[] args) {
        Task task1 = new Task("one", 5);
        Thread t1 = new Thread(task1);
        Daemon daemon = new Daemon(t1, 3);
        Thread daemoThread = new Thread(daemon);
        daemoThread.setDaemon(true);
        t1.start();
        daemoThread.start();
    }
}

线程池+FutureTask

FutureTask是一个可取消的异步计算,提供了对Future的基本实现。

线程池submit()一个任务后,可以使用future.get()设置超时等待时间,来达到超时退出的目的。

public class FutureTaskTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-pool-%d").build();
        ThreadPoolExecutor serviceExecutor = new ThreadPoolExecutor(1, 2, 1, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1), threadFactory, (r, executor) -> System.out.println("Reject Task"));

        Task task = new Task();

        Future<Integer> future = serviceExecutor.submit(task);

        Integer integer = future.get(1, TimeUnit.SECONDS);

        System.out.println(integer);

        serviceExecutor.shutdown();


    }

    static class Task implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("子线程在进行计算");
            Thread.sleep(3000);
            int sum = 0;
            for(int i=0;i<100;i++) {
                sum += i;
            }
            return sum;
        }
    }
}

CountDownLatch

CountDownLatch也提供了超时等待的API,也可以达到超时等待的效果。

CountDownLatch等待超时之后返回false,不抛出异常。

public class CountDownTest {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);

        Task task = new Task(countDownLatch);

        Thread thread = new Thread(task);

        thread.start();

        boolean await = countDownLatch.await(1, TimeUnit.SECONDS);

        System.out.println(await);

    }

    static class Task implements Runnable {

        private CountDownLatch latch;

        public Task(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            System.out.println("子线程" + Thread.currentThread().getName() + "开始执行");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("子线程"+Thread.currentThread().getName()+"执行完成");
            latch.countDown();//当前线程调用此方法,则计数减一

        }
    }
}

小结

一般多线程情况下超时退出,线程有超时等待方法,也可以考虑使用有超时等待的锁。

最好的情况还是使用线程池+FutureTask,比较简单可靠。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容