SparkR作业100分钟超时

香妮卡

我编写了一些复杂的sparkR脚本,并使用spark-submit运行它。脚本的基本作用是逐行读取基于蜂巢/黑斑羚的大型拼花地板的表格,并生成具有相同行数的新拼花地板文件。但是似乎工作在大约100分钟后停止,这似乎有些超时。

  • 对于多达50万行的脚本,它可以完美运行(因为它需要不到100分钟的时间)
  • 对于1、2或3或更多百万行,脚本将在100分钟后退出。

我检查了所有已知的值,并测试了100分钟范围内的所有可能参数。但是找不到任何解决方案。

[user@localhost R]$ time spark-submit sparkr-pre.R                           
Loading required package: methods

Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    filter, na.omit

The following objects are masked from ‘package:base’:

    intersect, rbind, sample, subset, summary, table, transform

15/12/30 18:04:27 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
[Stage 1:========================================>                 (7 + 3) / 10]Error in if (returnStatus != 0) { : argument is of length zero
Calls: write.df -> write.df -> .local -> callJMethod -> invokeJava
Execution halted
15/12/30 19:44:52 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1514)
        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1438)
        at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1724)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1723)
        at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:587)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
        at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1855)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)
        at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)
        at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
15/12/30 19:44:52 ERROR DefaultWriterContainer: Job job_201512301804_0000 aborted.
15/12/30 19:44:52 ERROR RBackendHandler: save on 25 failed

real    100m30.944s
user    1m26.326s
sys     0m19.459s

环境运行时信息

Name    Value
Java Home   /usr/java/jdk1.8.0_40/jre
Java Version    1.8.0_40 (Oracle Corporation)
Scala Version   version 2.10.4
Spark Properties

Name    Value
spark.akka.frameSize    1024
spark.app.id    application_1451466100034_0019
spark.app.name  SparkR
spark.driver.appUIAddress   http://x.x.x.x:4040
spark.driver.host   x.x.x.x
spark.driver.maxResultSize  8G
spark.driver.memory 100G
spark.driver.port   60471
spark.executor.id   driver
spark.executor.memory   14G
spark.executorEnv.LD_LIBRARY_PATH   $LD_LIBRARY_PATH:/usr/lib64/R/lib:/usr/local/lib64:/usr/lib/jvm/jre/lib/amd64/server:/usr/lib/jvm/jre/lib/amd64:/usr/lib/jvm/java/lib/amd64:/usr/java/packages/lib/amd64:/lib:/usr/lib::/usr/lib/hadoop/lib/native
spark.externalBlockStore.folderName spark-b60f685e-c46c-435d-ab1b-c9d1279f630f
spark.fileserver.uri    http://x.x.x.x:50281
spark.home  /datas/spark-1.5.2-bin-hadoop2.6
spark.kryoserializer.buffer.max 2000M
spark.master    yarn-client
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS  CDHPR1.dc.dialog.lk
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES  http://CDHPR1.dc.dialog.lk:8088/proxy/application_1451466100034_0019
spark.scheduler.mode    FIFO
spark.serializer    org.apache.spark.serializer.KryoSerializer
spark.sql.parquet.binaryAsString    true
spark.submit.deployMode client
spark.ui.filters    org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.yarn.dist.archives    file:/datas/spark-1.5.2-bin-hadoop2.6/R/lib/sparkr.zip#sparkr
spark.yarn.dist.files   file:/home/inuser/R/sparkr-pre.R
System Properties

Name    Value
SPARK_SUBMIT    true
SPARK_YARN_MODE true
awt.toolkit sun.awt.X11.XToolkit
file.encoding   UTF-8
file.encoding.pkg   sun.io
file.separator  /
java.awt.graphicsenv    sun.awt.X11GraphicsEnvironment
java.awt.printerjob sun.print.PSPrinterJob
java.class.version  52.0
java.endorsed.dirs  /usr/java/jdk1.8.0_40/jre/lib/endorsed
java.ext.dirs   /usr/java/jdk1.8.0_40/jre/lib/ext:/usr/java/packages/lib/ext
java.home   /usr/java/jdk1.8.0_40/jre
java.io.tmpdir  /tmp
java.library.path   :/usr/lib/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.runtime.name   Java(TM) SE Runtime Environment
java.runtime.version    1.8.0_40-b26
java.specification.name Java Platform API Specification
java.specification.vendor   Oracle Corporation
java.specification.version  1.8
java.vendor Oracle Corporation
java.vendor.url http://java.oracle.com/
java.vendor.url.bug http://bugreport.sun.com/bugreport/
java.version    1.8.0_40
java.vm.info    mixed mode
java.vm.name    Java HotSpot(TM) 64-Bit Server VM
java.vm.specification.name  Java Virtual Machine Specification
java.vm.specification.vendor    Oracle Corporation
java.vm.specification.version   1.8
java.vm.vendor  Oracle Corporation
java.vm.version 25.40-b25
line.separator  
os.arch amd64
os.name Linux
os.version  2.6.32-431.el6.x86_64
path.separator  :
sun.arch.data.model 64
sun.boot.class.path /usr/java/jdk1.8.0_40/jre/lib/resources.jar:/usr/java/jdk1.8.0_40/jre/lib/rt.jar:/usr/java/jdk1.8.0_40/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_40/jre/lib/jsse.jar:/usr/java/jdk1.8.0_40/jre/lib/jce.jar:/usr/java/jdk1.8.0_40/jre/lib/charsets.jar:/usr/java/jdk1.8.0_40/jre/lib/jfr.jar:/usr/java/jdk1.8.0_40/jre/classes
sun.boot.library.path   /usr/java/jdk1.8.0_40/jre/lib/amd64
sun.cpu.endian  little
sun.cpu.isalist 
sun.io.unicode.encoding UnicodeLittle
sun.java.command    org.apache.spark.deploy.SparkSubmit sparkr-pre.R
sun.java.launcher   SUN_STANDARD
sun.jnu.encoding    UTF-8
sun.management.compiler HotSpot 64-Bit Tiered Compilers
sun.nio.ch.bugLevel 
sun.os.patch.level  unknown
user.country    US
user.dir    /home/user/R
user.home   /home/user
user.language   en
user.name   inuser
user.timezone   Asia/Colombo
Classpath Entries

Resource    Source
/datas/spark-1.5.2-bin-hadoop2.6/conf/  System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/conf/yarn-conf/    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar  System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar   System Classpath

spark-default.conf

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
# spark.driver.memory              5g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
#
spark.master            yarn-client
spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.driver.memory             100G
spark.executor.memory           14G 
spark.sql.parquet.binaryAsString true
spark.kryoserializer.buffer.max 2000M
spark.driver.maxResultSize      8G
spark.akka.frameSize            1024
#spark.executor.instances       16

我无法在公共场合共享sparkR脚本。真的很抱歉。但是,当代码需要不到100分钟的时间才能完成时,它可以完美地工作。

CmdNtrf

它是Spark 1.6.0中的一个已知错误,请参阅:https : //issues.apache.org/jira/browse/SPARK-12609快速回顾一下SparkR代码还表明,该错误实际上是从Spark 1.4.0开始存在的。

在他们发布补丁之前,快速而肮脏的解决方案是增加超时。正如问题中所指出的那样,有问题的函数是。connectBackend该函数可以在运行时使用进行修补assignInNamespace

以下代码检索原始函数,然后将其包装在第二个函数中,为此我们将超时值增加到48小时。然后,原始函数将被包装器替换。

connectBackend.orig <- getFromNamespace('connectBackend', pos='package:SparkR')
connectBackend.patched <- function(hostname, port, timeout = 3600*48) {
   connectBackend.orig(hostname, port, timeout)
}
assignInNamespace("connectBackend", value=connectBackend.patched, pos='package:SparkR')

加载SparkR软件包后放入此代码。

另一种解决方案是修改SparkR代码中的超时并重新编译。有关编译说明,请参见:https : //github.com/apache/spark/blob/branch-1.6/R/install-dev.sh

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

@reboot 1分钟后如何开始Cron作业?

来自分类Dev

@reboot 1分钟后如何开始Cron作业?

来自分类Dev

在Unix下30分钟开始cron作业

来自分类Dev

十分钟后svnsync超时

来自分类Dev

Crontab作业每5分钟运行一次bash别名

来自分类Dev

使用Azure SDK 2.4无法以15分钟重复发布Web作业

来自分类Dev

将cron作业配置为每15分钟在Jenkins上运行

来自分类Dev

使用H / 1每隔一分钟运行一次Jenkins作业

来自分类Dev

每5分钟执行一次cron作业

来自分类Dev

配置在Hangfire中每15分钟执行一次的cron作业

来自分类Dev

PowerShell的Stop-Job / StopJob()需要2分钟才能停止作业

来自分类Dev

两小时之间每10分钟运行一次Cron作业

来自分类Dev

我可以使Flex模板作业开始处理数据的时间少于10分钟吗?

来自分类Dev

如果作业时间超过5分钟,则sSMTP无法发送Cron电子邮件

来自分类Dev

每5分钟执行一次cron作业

来自分类Dev

使用Azure SDK 2.4无法以15分钟重复发布Web作业

来自分类Dev

创建自动定期作业以每10分钟在App Engine上执行一次

来自分类Dev

在两次之间每5分钟执行一次cron作业

来自分类Dev

杀死运行时间超过20分钟的儿童作业

来自分类Dev

每5分钟在拱门上执行一次cron作业

来自分类Dev

每2小时执行一次cron作业,持续30分钟

来自分类Dev

使用延迟的作业每5分钟发送一次邮件

来自分类Dev

命令查找所有运行超过10分钟的作业

来自分类Dev

在1分钟后使用Hangfire和Azure Service Bus重试作业

来自分类Dev

让cron作业每30分钟运行一次-使用cron.hourly?

来自分类Dev

如何使用systemd替换要每五分钟运行一次的cron作业?

来自分类Dev

安排 PgAgent 作业每天每一分钟运行一次

来自分类Dev

Node.js:http请求在1分钟后超时

来自分类Dev

默认期限后(20分钟),MVC 5会话超时

Related 相关文章

  1. 1

    @reboot 1分钟后如何开始Cron作业?

  2. 2

    @reboot 1分钟后如何开始Cron作业?

  3. 3

    在Unix下30分钟开始cron作业

  4. 4

    十分钟后svnsync超时

  5. 5

    Crontab作业每5分钟运行一次bash别名

  6. 6

    使用Azure SDK 2.4无法以15分钟重复发布Web作业

  7. 7

    将cron作业配置为每15分钟在Jenkins上运行

  8. 8

    使用H / 1每隔一分钟运行一次Jenkins作业

  9. 9

    每5分钟执行一次cron作业

  10. 10

    配置在Hangfire中每15分钟执行一次的cron作业

  11. 11

    PowerShell的Stop-Job / StopJob()需要2分钟才能停止作业

  12. 12

    两小时之间每10分钟运行一次Cron作业

  13. 13

    我可以使Flex模板作业开始处理数据的时间少于10分钟吗?

  14. 14

    如果作业时间超过5分钟,则sSMTP无法发送Cron电子邮件

  15. 15

    每5分钟执行一次cron作业

  16. 16

    使用Azure SDK 2.4无法以15分钟重复发布Web作业

  17. 17

    创建自动定期作业以每10分钟在App Engine上执行一次

  18. 18

    在两次之间每5分钟执行一次cron作业

  19. 19

    杀死运行时间超过20分钟的儿童作业

  20. 20

    每5分钟在拱门上执行一次cron作业

  21. 21

    每2小时执行一次cron作业,持续30分钟

  22. 22

    使用延迟的作业每5分钟发送一次邮件

  23. 23

    命令查找所有运行超过10分钟的作业

  24. 24

    在1分钟后使用Hangfire和Azure Service Bus重试作业

  25. 25

    让cron作业每30分钟运行一次-使用cron.hourly?

  26. 26

    如何使用systemd替换要每五分钟运行一次的cron作业?

  27. 27

    安排 PgAgent 作业每天每一分钟运行一次

  28. 28

    Node.js:http请求在1分钟后超时

  29. 29

    默认期限后(20分钟),MVC 5会话超时

热门标签

归档