EMA函数在R数据帧上有效,但在Spark数据帧上失败-Sparklyr

SM2019

我对R和Spark比较陌生。我正在编写一个函数来计算一组数据的指数移动平均值。我正在使用sparklyr软件包在Databricks Spark平台上工作。

我编写了一个在普通R数据帧上工作的函数。但是,将其应用于Spark数据框时会失败。

我目前对值的正确性不感兴趣(我正在使用伪值-例如init = 10是任意的)。我对将其用于Spark数据框更感兴趣

library(sparklyr)
library(dplyr)
library(stats)

sc <- spark_connect(method = "databricks") 

set.seed(21)
#data
x <- rnorm(1e4)
#data in a dataframe
x_df <- data.frame(x)
#data in a Spark dataframe
x_sprk <- copy_to(sc, x_df, name ="x_sql", overwrite = TRUE)

#function to calculate Exponential moving average

ewma_filter <- function (df, ratio = 0.9) {
  mutate(df, ema = c(stats::filter(x * ratio, 1 - ratio, "recursive", init = 10)))
}

当我在R数据帧上运行此函数时,它工作正常

y_df <- x_df %>% ewma_filter()

输出:

                 x           ema
1     0.6785634656  1.6107071191
2    -0.8519017349 -0.6056408495
3    -0.0362643838 -0.0932020304
4     0.2422350575  0.2086913487
5    -1.0401144499 -0.9152338701
6     1.4521621543  1.2154225519
7    -0.8531140006 -0.6462603453
8     0.4779933902  0.3655680167
9     1.0719294487  1.0012933055
10   -0.4115495580 -0.2702652716
11    2.4152301588  2.1466806157
12   -0.1045401223  0.1205819515
13   -0.1632591646 -0.1348750530
14   -2.1441820131 -1.9432513170
15    0.4672471535  0.2261973065
16    0.9362099384  0.8652086752
17    0.6494043831  0.6709848123
18    2.5609202716  2.3719267257

但是当我在Spark数据帧上尝试时,没有得到预期的输出:

y_sprk <- x_sprk %>% ewma_filter()

输出:

         x ema            

 1  0.679  
 2 -0.852  
 3 -0.0363 
 4  0.242  
 5 -1.04   
 6  1.45   
 7 -0.853  
 8  0.478  
 9  1.07   
10 -0.412  
# … with more rows

我尝试使用spark_apply():

y_sprk <- spark_apply(x_sprk, ewma_filter, columns = list(x = "numeric", ema = "numeric"))

我收到以下错误:

Error : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 115.0 failed 4 times, most recent failure: Lost task 0.3 in stage 115.0 (TID 8623, 10.139.64.6, executor 0): java.lang.Exception: sparklyr worker rscript failure with status 255, check worker logs for details.
    at sparklyr.Rscript.init(rscript.scala:106)
    at sparklyr.WorkerApply$$anon$2.run(workerapply.scala:116)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2240)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:270)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:280)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:80)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:86)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:55)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectResult(Dataset.scala:2828)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3440)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2795)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2795)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3424)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3419)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3419)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2795)
    at sparklyr.Utils$.collect(utils.scala:204)
    at sparklyr.Utils.collect(utils.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke.invoke(invoke.scala:139)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66)
    at sparklyr.BackendHandler.channelRead0(handler.scala:51)
    at sparklyr.BackendHandler.channelRead0(handler.scala:4)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: sparklyr worker rscript failure with status 255, check worker logs for details.
    at sparklyr.Rscript.init(rscript.scala:106)
    at sparklyr.WorkerApply$$anon$2.run(workerapply.scala:116)

如果您可以帮助调试它并使它在spark数据帧上工作,我将不胜感激。

拉斐尔K

你很亲密!spark_apply()默认情况下可在Spark DataFrame的每个分区上运行,这对您要尝试执行的操作很好。您收到的错误消息并不能告诉您太多信息-要真正了解发生了什么,您实际上必须在中的工作程序节点日志中查找stdout在Databricks上,您可以在“ Spark UI-Master”下的“ Clusters UI”中找到它,然后钻入工作节点。

对于您的代码,错误消息实际上是 19/11/10 19:38:25 ERROR sparklyr: RScript (2719) terminated unexpectedly: could not find function "mutate"

似乎mutate找不到,这很奇怪,但是这些UDF的工作方式是在工作节点上创建一个R进程,并且要使该函数正常工作,所有代码/库也必须在这些节点上可用。由于您在Databricks上运行,并且dplyr包含在Databricks Runtime中,因此它在所有辅助节点上都可用。您只需要引用名称空间或加载完整的库:

library(sparklyr)
library(dplyr)
library(stats)

sc <- spark_connect(method = "databricks") 

# Create R dataframe
set.seed(21)
x <- rnorm(1e4)
x_df <- data.frame(x)

# Push R dataframe to Spark
x_sprk <- copy_to(sc, x_df, name ="x_sql", overwrite = TRUE)

# Distribute the R code across each partition
spark_apply(x_sprk, function(x) {

  # Define moving average function and reference dplyr explicitly
  ewma_filter <- function (df, ratio = 0.9) {
  dplyr::mutate(df, ema = c(stats::filter(x * ratio, 1 - ratio, "recursive", init = 10)))
  }

  # Apply it to each partition of the Spark DF
  ewma_filter(x)
})

这些是此调用的结果spark_apply()

# Source: spark<?> [?? x 2]
          x      ema
      <dbl>    <dbl>
 1  0.793    1.71   
 2  0.522    0.641  
 3  1.75     1.64   
 4 -1.27    -0.981  
 5  2.20     1.88   
 6  0.433    0.578  
 7 -1.57    -1.36   
 8 -0.935   -0.977  
 9  0.0635  -0.0406 
10 -0.00239 -0.00621
# … with more rows

R用户指南》中也对此进行了介绍

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

希望使用SparklyR按索引对Spark数据帧进行排序

来自分类Dev

等效于使用 sparklyr 的火花表的“str()”(描述数据帧)

来自分类Dev

根据列数据类型对 spark 数据帧(在 sparklyr 中)进行子集化的最佳方法是什么

来自分类Dev

适用于 Sparklyr 的 Spark 兼容版本

来自分类Dev

列出Spark数据库中的表名称-sparklyR

来自分类Dev

使用sparklyr在R中调用Spark窗口函数

来自分类Dev

如何在 sparklyr 中重新分区数据框

来自分类Dev

如何形成基于词汇表的 tfidf sparklyr 数据框

来自分类Dev

Sparklyr 无法在 spark_apply 中引用表

来自分类Dev

R从Sparklyr中的ALS实施中提取潜在因素

来自分类Dev

在R中的长数据帧上有效使用功能

来自分类Dev

Sparklyr,spark_read_csv,每次都要重新导入数据吗?

来自分类Dev

有没有办法用sparklyr处理嵌套数据?

来自分类Dev

sparklyr:spark_apply 函数在集群模式下不起作用

来自分类Dev

RPR / Sparklyr在MAPR / Spark上-替换为。在字符串中

来自分类Dev

代号一个数据上载在Android手机上有效,但在Iphone 4上失败

来自分类Dev

代号一个数据上载在Android手机上有效,但在Iphone 4上失败

来自分类Dev

是否可以在Sparklyr中使用本机R代码或其他R包函数?

来自分类Dev

R:有效检查数据帧中的相邻元素

来自分类Dev

使用groupby在大型数据帧上有效地进行Fillna(正向填充)?

来自分类Dev

Sparklyr:如何计算 2 个 Spark 表之间的相关系数?

来自分类Dev

通过 Sparklyr 在本地模式下运行 Spark 时如何配置驱动程序内存?

来自分类Dev

sparklyr spark_read_parquet 将字符串字段读取为列表

来自分类常见问题

有没有办法在Sparklyr中使用ml_linear_regression显示标准错误?

来自分类Dev

有没有办法在Sparklyr中使用ml_linear_regression显示标准错误?

来自分类Dev

可以将Sparklyr与部署在纱线管理的Hadoop集群上的火花一起使用吗?

来自分类Dev

使用 sparklyr 和 R 获取活动火花执行器的数量

来自分类Dev

sparklyr - 安装后出错

来自分类Dev

r sparklyr spark_apply 错误:org.apache.spark.sql.AnalysisException:引用“id”不明确

Related 相关文章

  1. 1

    希望使用SparklyR按索引对Spark数据帧进行排序

  2. 2

    等效于使用 sparklyr 的火花表的“str()”(描述数据帧)

  3. 3

    根据列数据类型对 spark 数据帧(在 sparklyr 中)进行子集化的最佳方法是什么

  4. 4

    适用于 Sparklyr 的 Spark 兼容版本

  5. 5

    列出Spark数据库中的表名称-sparklyR

  6. 6

    使用sparklyr在R中调用Spark窗口函数

  7. 7

    如何在 sparklyr 中重新分区数据框

  8. 8

    如何形成基于词汇表的 tfidf sparklyr 数据框

  9. 9

    Sparklyr 无法在 spark_apply 中引用表

  10. 10

    R从Sparklyr中的ALS实施中提取潜在因素

  11. 11

    在R中的长数据帧上有效使用功能

  12. 12

    Sparklyr,spark_read_csv,每次都要重新导入数据吗?

  13. 13

    有没有办法用sparklyr处理嵌套数据?

  14. 14

    sparklyr:spark_apply 函数在集群模式下不起作用

  15. 15

    RPR / Sparklyr在MAPR / Spark上-替换为。在字符串中

  16. 16

    代号一个数据上载在Android手机上有效,但在Iphone 4上失败

  17. 17

    代号一个数据上载在Android手机上有效,但在Iphone 4上失败

  18. 18

    是否可以在Sparklyr中使用本机R代码或其他R包函数?

  19. 19

    R:有效检查数据帧中的相邻元素

  20. 20

    使用groupby在大型数据帧上有效地进行Fillna(正向填充)?

  21. 21

    Sparklyr:如何计算 2 个 Spark 表之间的相关系数?

  22. 22

    通过 Sparklyr 在本地模式下运行 Spark 时如何配置驱动程序内存?

  23. 23

    sparklyr spark_read_parquet 将字符串字段读取为列表

  24. 24

    有没有办法在Sparklyr中使用ml_linear_regression显示标准错误?

  25. 25

    有没有办法在Sparklyr中使用ml_linear_regression显示标准错误?

  26. 26

    可以将Sparklyr与部署在纱线管理的Hadoop集群上的火花一起使用吗?

  27. 27

    使用 sparklyr 和 R 获取活动火花执行器的数量

  28. 28

    sparklyr - 安装后出错

  29. 29

    r sparklyr spark_apply 错误:org.apache.spark.sql.AnalysisException:引用“id”不明确

热门标签

归档