当ExecutorService上的所有任务完成或取消时,为什么awaitTermination无法可靠地返回

保罗·泰勒

我是我的代码,我向ExecutorService提交了一些任务,然后使用shutdown()和awaitTermination()等待它们完成。但是,如果任何一项任务花费的时间超过某个时间段,我希望取消它而不影响其他任务。我使用来自ExecutorService的代码修改代码,代码在超时后中断任务,如下所示:

package com.jthink.jaikoz.memory;

import com.jthink.jaikoz.MainWindow;

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private boolean isShutdown = false;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

    //Map Task to the Timeout Task that could be used to interrupt it
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public long getTimeout()
    {
        return timeout;
    }

    public TimeUnit getTimeoutUnit()
    {
        return timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        isShutdown = true;
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            //Schedule a task to interrupt the thread that is running the task after time timeout
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);

            //Add Mapping
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        //Remove mapping and cancel timeout task
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }

        if (isShutdown)
        {
            if(getQueue().isEmpty())
            {
                //Queue is empty so all tasks either finished or currently running
                MainWindow.logger.severe("---Thread Pool Queue is Empty");
                timeoutExecutor.shutdown();
            }
        }
    }

    /**
     * Interrupt the thread
     *
     */
    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            MainWindow.logger.severe("Cancelling task because taking too long");
            thread.interrupt();
        }
    }
}

一个测试用例,用于说明何时有时间完成任务以及何时都无法正常工作

package com.jthink.jaikoz;

import com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor;
import junit.framework.TestCase;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by Paul on 08/12/2014.
 */
public class TestThreadPool extends TestCase
{
    public void testThreadPoolTasksComplete() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 6, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }

    public void testThreadPoolTasksCancelled() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }
}

并且在我的代码中似乎起作用:

private boolean matchToRelease(ListMultimap<MatchKey, MetadataChangedWrapper> matchKeyToSongs)
            throws JaikozException
    {
        if (stopTask)
        {
            MainWindow.logger.warning("Analyser stopped detected in matchToRelease");
            return false;
        }

        TimeoutThreadPoolExecutor es = getExecutorService();
        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(matchKeyToSongs.size());
        for(MatchKey matchKey:matchKeyToSongs.keySet())
        {
            List<MetadataChangedWrapper> songs = matchKeyToSongs.get(matchKey);
            futures.add(es.submit(new CorrectFromMusicBrainzWorker(this, stats, matchKey, songs)));
        }
        es.shutdown();
        try
        {
            es.awaitTermination(matchKeyToSongs.keySet().size() * es.getTimeout(), es.getTimeoutUnit());
        }
        catch(InterruptedException ie)
        {
            MainWindow.logger.warning(this.getClass() + " has been interrupted");
            return false;
        }
        return true;
    }

但是对于一个客户

---Thread Pool Queue is Empty

输出awaitTermination()不返回,仅在用户两小时后取消任务时最终返回-此处完整日志摘录

14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzWorker:getSongsNotMatched:SEVERE: /Volumes/2TB External/New iTunes Library/iTunes Media/Music/XTC:albumMetadataMatchingCounts11:AlreadyMatched:2:ToMatch:11
14/12/2014 20.44.19:com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor:afterExecute:SEVERE: ---Thread Pool Queue is Empty
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.ExecutorServiceEnabledAnalyser:cancelTask:WARNING: Cancelling class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser Task
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser:matchToRelease:WARNING: class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser has been interrupted

那么,即使日志显示队列为空,并且在Executor本身和嵌入式timeoutExecutor上都调用了shutdown(),那awaiTermination()也不返回是怎么回事?

我本人对此有一些想法,但不知道答案。

  1. 首先,为什么实际上有必要关闭TimeOutExecutor以使awaitTermination()返回。在我的子类中,awaitTermination()不会被覆盖,因此如果所有任务都已完成,那么TiumeOutExecutor(awaitTermination()一无所知)是否关闭有什么关系呢?

  2. 其次,为什么---线程池队列为空有时会多次获得输出

  3. TimeOutExecutor是单线程的,这是正确的/必要的吗?

根据Holgers答案进行更新

因此,您遇到的问题是您太早关闭了timeoutExecutor,因此可能会中断线程池执行程序的挂起任务,从而错过一项或多项任务。

对,现在我看到一个空队列仅表示所有任务已经完成或开始。(对不起,我的示例测试以前误导了它运行的临时任务超过10个,并且在生产代码中,worker的数量基于用户计算机上的cpus数)。

因此,您说的是我太早关闭()timeoutExecutor(可能正在运行多达WorkerSize -1任务),这意味着所有仍在为尚未完成的任务而运行的timeoutExecutors被中断。因此,如果其中任何一个由于某种原因未能完全完成,则它们的超时任务将不复存在,因此无法用于中断它们。但是awaitTermination()无法返回的唯一原因是,如果这些最后一个(WorkerSize -1)任务之一没有完成。

我自己决定将beforeExecute()更改为

protected void afterExecute(Runnable r, Throwable t) {
    ScheduledFuture timeoutTask = runningTasks.remove(r);
    if(timeoutTask != null) {
        timeoutTask.cancel(false);
    }
    if (isShutdown)
    {
        if(getQueue().isEmpty())
        {

            if(runningTasks.size()==0)
            {
                this.shutdownNow();
            }
        }
    }
}

为确保完成,我使用了shutdownNow(),但是直到一切都完成了,但是根据您的评论,这仍然可能无法正常工作

我应该做

protected void afterExecute(Runnable r, Throwable t) {
    ScheduledFuture timeoutTask = runningTasks.remove(r);
    if(timeoutTask != null) {
        timeoutTask.cancel(false);
    }
}

protected void terminated() 
{
    timeoutExecutor.shutdown();
}

并在所有提交的任务完成之后(自然地或通过相应的timeoutExecutor取消)立即调用并终止了()是否在这一点上仍然存在timeoutExecutor?

为了完全修改我的测试用例,除非超时任务正在运行,否则该任务将需要很长时间才能显示原始解决方案失败(挂起)并且修订后的解决方案正在运行

public void testThreadPoolTasksCancelled() throws Exception
    {
        Instant t1, t2;
        t1 = Instant.now();
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);

        for (int i = 0; i < 50; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(500000000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        t2 = Instant.now();
        System.out.println("Program done:"+(Duration.between(t1, t2).toMillis()/ 1000+ " seconds"));
    }
霍尔格

队列仅包含尚未开始的作业。拥有空队列并没有暗示没有挂起的工作; 它们可能只是为了执行而被删除。尤其是在您的示例代码中,一个空队列意味着没有正在运行的作业的假设是致命的错误。由于您将执行程序配置为具有十个核心线程并提交十个作业,因此在示例代码的整个执行过程中,队列始终为空。

因此,您所面临的问题是您timeoutExecutor过早关闭了方式,因此可能会中断线程池执行程序的挂起任务,从而错过一项或多项任务。

请注意,原则上,作业甚至可以从队列中删除(如果beforeExecute添加)但尚未被调用的状态。因此,即使队列为空且runningTasks映射为空也不能保证没有挂起的作业。


要回答其他问题,您必须关闭,timeoutExecutor因为它具有关联的活动线程,该线程将始终使执行程序保持活动状态。因此,不关闭它会导致内存泄漏并进一步保持线程活动,因此始终会阻止JVM自动关闭。

但是正确的关闭方法timeoutExecutor是重写该方法protected void terminated()而该方法正是用于清除的。


最后一点,您的线程数无关紧要,timeoutExecutor但是鉴于任务的简单性,拥有多个线程并没有任何好处,而单线程执行程序是最简单且可能是最有效的解决方案。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

为什么我的解析任务有时无法完成?

来自分类Dev

Kubelet 的 cAdvisor 指标端点不能可靠地返回所有指标

来自分类Dev

为什么PHP的sprintf无法可靠地进行5s舍入?

来自分类Dev

为什么Chrome无法可靠地加载3个简单的音频文件?

来自分类Dev

无法在VirtualBox上可靠地自动执行xrandr设置

来自分类Dev

为什么后台作业发送的USR1信号不能被等待在Bash脚本中完成的父Shell进程可靠地接收?

来自分类Dev

可靠地发送有关Celery任务成功/失败的邮件

来自分类Dev

如何检查在ExecutorService上运行的所有任务是否已完成

来自分类Dev

为什么我们不能一过就可靠地检测回文

来自分类Dev

为什么不可靠地调用UIApplicationDelegate方法`application(_:configurationForConnecting:options:)`

来自分类Dev

为什么基于非共享的杀死只能与--fork一起可靠地工作?

来自分类Dev

如何可靠地确定最近7天未使用的所有文件夹?

来自分类Dev

如何可靠地确定最近7天未使用的所有文件夹?

来自分类Dev

如何可靠地阻止Windows 10防火墙中的所有传入连接?

来自分类Dev

如何通过 KafkaConsumer 可靠地获取所有 kafka 主题消息

来自分类Dev

完成所有ExecutorService任务后,程序不会立即终止

来自分类Dev

为什么在 asyncio.wait() 中我明确表示只希望完成第一个任务时所有任务都完成了?

来自分类Dev

带有占位符的dbExecute无法与RSQLite可靠地一起使用

来自分类Dev

您如何能可靠地返回不可连接的值?

来自分类Dev

将“$!” 使用“&”可靠地返回正确的 ID?

来自分类Dev

Mocha WebSocket测试无法可靠地通过/失败

来自分类Dev

处理程序无法在Android中可靠地工作

来自分类Dev

无法可靠地确定服务器的标准域名

来自分类Dev

无法可靠地确定服务器的标准域名

来自分类Dev

无法可靠地传递NodeJS / SocketIO消息

来自分类Dev

在OSX上可靠地部署Delphi生成的Dylib

来自分类Dev

如何使Linux在多CPU机器上可靠地启动?

来自分类Dev

在OSX上可靠地部署Delphi生成的Dylib

来自分类Dev

我无法获得Pubnub在线状态超时来可靠地使用心跳值-我缺少什么?

Related 相关文章

  1. 1

    为什么我的解析任务有时无法完成?

  2. 2

    Kubelet 的 cAdvisor 指标端点不能可靠地返回所有指标

  3. 3

    为什么PHP的sprintf无法可靠地进行5s舍入?

  4. 4

    为什么Chrome无法可靠地加载3个简单的音频文件?

  5. 5

    无法在VirtualBox上可靠地自动执行xrandr设置

  6. 6

    为什么后台作业发送的USR1信号不能被等待在Bash脚本中完成的父Shell进程可靠地接收?

  7. 7

    可靠地发送有关Celery任务成功/失败的邮件

  8. 8

    如何检查在ExecutorService上运行的所有任务是否已完成

  9. 9

    为什么我们不能一过就可靠地检测回文

  10. 10

    为什么不可靠地调用UIApplicationDelegate方法`application(_:configurationForConnecting:options:)`

  11. 11

    为什么基于非共享的杀死只能与--fork一起可靠地工作?

  12. 12

    如何可靠地确定最近7天未使用的所有文件夹?

  13. 13

    如何可靠地确定最近7天未使用的所有文件夹?

  14. 14

    如何可靠地阻止Windows 10防火墙中的所有传入连接?

  15. 15

    如何通过 KafkaConsumer 可靠地获取所有 kafka 主题消息

  16. 16

    完成所有ExecutorService任务后,程序不会立即终止

  17. 17

    为什么在 asyncio.wait() 中我明确表示只希望完成第一个任务时所有任务都完成了?

  18. 18

    带有占位符的dbExecute无法与RSQLite可靠地一起使用

  19. 19

    您如何能可靠地返回不可连接的值?

  20. 20

    将“$!” 使用“&”可靠地返回正确的 ID?

  21. 21

    Mocha WebSocket测试无法可靠地通过/失败

  22. 22

    处理程序无法在Android中可靠地工作

  23. 23

    无法可靠地确定服务器的标准域名

  24. 24

    无法可靠地确定服务器的标准域名

  25. 25

    无法可靠地传递NodeJS / SocketIO消息

  26. 26

    在OSX上可靠地部署Delphi生成的Dylib

  27. 27

    如何使Linux在多CPU机器上可靠地启动?

  28. 28

    在OSX上可靠地部署Delphi生成的Dylib

  29. 29

    我无法获得Pubnub在线状态超时来可靠地使用心跳值-我缺少什么?

热门标签

归档