使用grails和gpars处理大量数据

mmjmanders

我有一个Grails应用程序,该应用程序每天在午夜运行一项工作。在我的示例应用程序中,我有10000Person条记录,并在石英作业中执行以下操作:

package threading

import static grails.async.Promises.task
import static groovyx.gpars.GParsExecutorsPool.withPool

class ComplexJob {
    static triggers = {
        simple repeatInterval: 30 * 1000l
    }

    def execute() {
        if (Person.count == 5000) {
            println "Executing job"                
            withPool 10000, {
                Person.listOrderByAge(order: "asc").each { p ->
                    task {
                        log.info "Started ${p}"
                        Thread.sleep(15000l - (-1 * p.age))
                    }.onComplete {
                        log.info "Completed ${p}"
                    }
                }
            }                
        }
    }
}

忽略,repeatInterval因为这仅用于测试目的。作业执行后,出现以下异常:

2014-11-14 16:11:51,880 quartzScheduler_Worker-3 grails.plugins.quartz.listeners.ExceptionPrinterJobListener - Exception occurred in job: Grails Job
org.quartz.JobExecutionException: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000 [See nested exception: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000]
    at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:111)
    at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
    at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
Caused by: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000
    at org.grails.async.factory.gpars.LoggingPoolFactory$3.rejectedExecution(LoggingPoolFactory.groovy:100)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at groovyx.gpars.scheduler.DefaultPool.execute(DefaultPool.java:155)
    at groovyx.gpars.group.PGroup.task(PGroup.java:305)
    at groovyx.gpars.group.PGroup.task(PGroup.java:286)
    at groovyx.gpars.dataflow.Dataflow.task(Dataflow.java:93)
    at org.grails.async.factory.gpars.GparsPromise.<init>(GparsPromise.groovy:41)
    at org.grails.async.factory.gpars.GparsPromiseFactory.createPromise(GparsPromiseFactory.groovy:68)
    at grails.async.Promises.task(Promises.java:123)
    at threading.ComplexJob$_execute_closure1_closure3.doCall(ComplexJob.groovy:20)
    at threading.ComplexJob$_execute_closure1.doCall(ComplexJob.groovy:19)
    at groovyx.gpars.GParsExecutorsPool$_withExistingPool_closure2.doCall(GParsExecutorsPool.groovy:192)
    at groovyx.gpars.GParsExecutorsPool.withExistingPool(GParsExecutorsPool.groovy:191)
    at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:162)
    at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:136)
    at threading.ComplexJob.execute(ComplexJob.groovy:18)
    at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:104)
    ... 2 more
2014-11-14 16:12:06,756 Actor Thread 20 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
    at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368)
    at groovyx.gpars.group.PGroup$4.run(PGroup.java:315)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
2014-11-14 16:12:06,756 Actor Thread 5 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed.
    at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368)
    at groovyx.gpars.group.PGroup$4.run(PGroup.java:315)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

似乎在我使用线程池时尚未将线程池设置为10000withPool(10000)可以分块执行此计算(现在仅打印日志语句)吗?如果是这样,我如何得知最近处理的是什么(例如,从何处继续)?

丹尼斯·波罗维科夫(Denis Borovikov)

尝试将每个元素的处理包装到任务中似乎不是最佳选择。进行并行处理的标准方法是将整个任务拆分为适当数量的子任务。您从选择此号码开始。对于与CPU绑定的任务,您可以创建N =处理器数量的任务。然后,您将任务拆分为N个子任务。像这样:

persons = Person.listOrderByAge(order: "asc")
nThreads = Runtime.getRuntime().availableProcessors()
size = persons.size() / nThreads
withPool nThreads, {
    persons.collate(size).each { subList =>
        task {
            subList.each { p =>
                ...     
            }
        }           
    }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

使用awk和bash的vlookup:慢处理大量数据

来自分类Dev

使用bash处理大量数据

来自分类Dev

使用休眠处理大量数据

来自分类Dev

使用 MySql 处理大量数据

来自分类Dev

使用Java处理来自PostgreSQL的大量数据

来自分类Dev

使用Python编程处理大量数据

来自分类Dev

如何正确使用GreenDao LazyList处理大量数据+排序和过滤

来自分类Dev

如何使用WPF和C#刷新处理大量交易数据的UI?

来自分类Dev

使用Spring批处理分区处理大量数据

来自分类Dev

如何加速处理大量数据和非索引连接的插入?

来自分类Dev

使用红宝石中的多个重复项处理大量数据

来自分类Dev

使用highChart处理大量数据的Rails内存泄漏

来自分类Dev

使用REST Web服务处理大量数据

来自分类Dev

如何使用查找-exec和Tr处理大量文件

来自分类Dev

使用pexpect和大量地址用于处理的For循环

来自分类Dev

Python多处理大量数据

来自分类Dev

在LibreOffice中处理大量数据

来自分类Dev

通过JQuery处理大量数据

来自分类Dev

优化UITableview以处理大量数据

来自分类Dev

Spring Batch - 处理大量数据

来自分类Dev

处理大量文本数据

来自分类Dev

使用CompletionService处理大量任务

来自分类Dev

使用AWS处理大量图像

来自分类Dev

使用批处理插入到数据库中的大量数据

来自分类Dev

Linq查询和Foreach处理来自SQL数据库的大量记录

来自分类Dev

分页和排序大量数据

来自分类Dev

计算和汇总大量数据

来自分类Dev

STL容器和大量数据

来自分类Dev

使用lapply和gsub处理数据帧