缓存的Spark RDD(从序列文件读取)具有无效的条目,我该如何解决?

汤姆·古达(Thamme Gowda)

我正在使用Spark(v1.6.1)阅读Hadoop序列文件。缓存RDD后,RDD中的内容变为无效(最后一个条目重复n两次)。

这是我的代码段:

import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]) {
    val seqfile = "data-1.seq"
    val conf: SparkConf = new SparkConf()
      .setAppName("..Buffer..")
      .setMaster("local")
      .registerKryoClasses(Array(classOf[Text]))
    val sc = new SparkContext(conf)

    sc.parallelize((0 to 1000).toSeq) //creating a sample sequence file
      .map(i => (new Text(s"$i"), new Text(s"${i*i}")))
      .saveAsHadoopFile(seqfile, classOf[Text], classOf[Text],
        classOf[SequenceFileOutputFormat[Text, Text]])

    val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text])
      .cache()
      .map(t => {println(t); t})
      .collectAsMap()
    println(c)
    println(c.size)

    sc.stop()
  }
}

输出:

(1000,1000000)
(1000,1000000)
(1000,1000000)
(1000,1000000)
(1000,1000000)
...... //Total 1000 lines with same content as above ...
Map(1000 -> 1000000)
1

编辑:对于将来的访问者:如果您像我在上面的代码片段中一样阅读序列文件,请参阅接受的答案。一个简单的解决方法是制作HadoopWritable实例的副本

val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text])
  .map(t =>(new Text(t._1), new Text(t._2)))   //Make copy of writable instances
zhang zhan

请参考sequenceFile中的注释。

/** Get an RDD for a Hadoop SequenceFile with given key and value types.
 *
 * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
 * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
 * operation will create many references to the same object.
 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
 * copy them using a `map` function.
 */

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

缓存的Spark RDD(从序列文件读取)具有无效的条目,我该如何解决?

来自分类Dev

Spark缓存:仅RDD缓存了8%

来自分类Dev

Spark中的缓存如何工作

来自分类Dev

Spark Streaming:如何定期刷新缓存的RDD?

来自分类Dev

Spark Streaming:如何定期刷新缓存的RDD?

来自分类Dev

Spark RDD缓存会走多远?

来自分类Dev

如何从Spark Streaming数据延迟构建缓存

来自分类Dev

如何从Spark Streaming数据延迟构建缓存

来自分类Dev

Spark列出所有缓存的RDD名称,并且不持久

来自分类Dev

如何通过Spark控制RDD的隐式缓存?

来自分类Dev

在Apache Spark中缓存RDD的目的是什么?

来自分类Dev

Spark:缓存要在其他作业中使用的RDD

来自分类Dev

为什么我必须明确告诉Spark要缓存什么?

来自分类Dev

如何在Spark中将文件部分作为RDD缓存在内存中?

来自分类Dev

Spark历史记录WebUI上未显示Spark缓存RDD-存储

来自分类Dev

Spark Scala:如何强制Spark重新计算一些结果(不使用其缓存)

来自分类Dev

Spark-Hive 错误,我该如何解决?

来自分类Dev

如果我在Spark中两次缓存相同的RDD,会发生什么情况?

来自分类Dev

Spark SQL:如何在不使用rdd.cache()的情况下缓存SQL查询结果

来自分类Dev

getPersistentRDDs在Spark 2.2.0中返回缓存的RDD和DataFrame的映射,但是在Spark 2.4.7中-它仅返回缓存的RDD的Map

来自分类Dev

Spark RDD是否缓存在工作程序节点或驱动程序节点(或两者)上?

来自分类Dev

spark-在本地主机上执行时渴望加载和缓存RDD

来自分类Dev

有没有一种惯用的方式来缓存Spark数据帧?

来自分类Dev

spark中的memory_only和memory_and_disk缓存级别有什么区别?

来自分类Dev

在内存中缓存Spark数据帧是否有额外的开销?

来自分类Dev

如何解决本地缓存?

来自分类Dev

如何缓存Spark数据帧并在另一个脚本中引用它

来自分类Dev

如何解决 ignitecheckedexception:确保缓存配置中的所有对象都是可序列化的

来自分类Dev

了解Spark的缓存

Related 相关文章

  1. 1

    缓存的Spark RDD(从序列文件读取)具有无效的条目,我该如何解决?

  2. 2

    Spark缓存:仅RDD缓存了8%

  3. 3

    Spark中的缓存如何工作

  4. 4

    Spark Streaming:如何定期刷新缓存的RDD?

  5. 5

    Spark Streaming:如何定期刷新缓存的RDD?

  6. 6

    Spark RDD缓存会走多远?

  7. 7

    如何从Spark Streaming数据延迟构建缓存

  8. 8

    如何从Spark Streaming数据延迟构建缓存

  9. 9

    Spark列出所有缓存的RDD名称,并且不持久

  10. 10

    如何通过Spark控制RDD的隐式缓存?

  11. 11

    在Apache Spark中缓存RDD的目的是什么?

  12. 12

    Spark:缓存要在其他作业中使用的RDD

  13. 13

    为什么我必须明确告诉Spark要缓存什么?

  14. 14

    如何在Spark中将文件部分作为RDD缓存在内存中?

  15. 15

    Spark历史记录WebUI上未显示Spark缓存RDD-存储

  16. 16

    Spark Scala:如何强制Spark重新计算一些结果(不使用其缓存)

  17. 17

    Spark-Hive 错误,我该如何解决?

  18. 18

    如果我在Spark中两次缓存相同的RDD,会发生什么情况?

  19. 19

    Spark SQL:如何在不使用rdd.cache()的情况下缓存SQL查询结果

  20. 20

    getPersistentRDDs在Spark 2.2.0中返回缓存的RDD和DataFrame的映射,但是在Spark 2.4.7中-它仅返回缓存的RDD的Map

  21. 21

    Spark RDD是否缓存在工作程序节点或驱动程序节点(或两者)上?

  22. 22

    spark-在本地主机上执行时渴望加载和缓存RDD

  23. 23

    有没有一种惯用的方式来缓存Spark数据帧?

  24. 24

    spark中的memory_only和memory_and_disk缓存级别有什么区别?

  25. 25

    在内存中缓存Spark数据帧是否有额外的开销?

  26. 26

    如何解决本地缓存?

  27. 27

    如何缓存Spark数据帧并在另一个脚本中引用它

  28. 28

    如何解决 ignitecheckedexception:确保缓存配置中的所有对象都是可序列化的

  29. 29

    了解Spark的缓存

热门标签

归档