总而言之,我在mongodb中有一张大约1TB的表。我尝试使用mongo连接器将其加载到spark中,但是执行18分钟后,我不断出现堆栈溢出。
java.lang.StackOverflowError:
at scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
....
at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
16/06/29 08:42:22 INFO YarnAllocator: Driver requested a total number of 54692 executor(s).
16/06/29 08:42:22 INFO YarnAllocator: Will request 46501 executor containers, each with 4 cores and 5068 MB memory including 460 MB overhead
是因为我没有提供足够的内存吗?还是应该提供更多存储空间?我试图添加检查点,但这无济于事。我更改了代码中的某些值,因为它们与我的公司数据库有关,但是对于此问题,整个代码仍然可以。
val sqlContext = new SQLContext(sc)
val builder = MongodbConfigBuilder(Map(Host -> List("mymongodurl:mymongoport"), Database -> "mymongoddb", Collection ->"mymongocollection", SamplingRatio -> 0.01, WriteConcern -> "normal"))
val readConfig = builder.build()
val mongoRDD = sqlContext.fromMongoDB(readConfig)
mongoRDD.registerTempTable("mytable")
val dataFrame = sqlContext.sql("SELECT u_at, c_at FROM mytable")
val deltaCollect = dataFrame.filter("u_at is not null and c_at is not null and u_at != c_at").rdd
val mapDelta = deltaCollect.map {
case Row(u_at: Date, c_at: Date) =>{
if(u_at.getTime == c_at.getTime){
(0.toString, 0l)
}
else{
val delta = ( u_at.getTime - c_at.getTime ) / 1000/60/60/24
(delta.toString, 1l)
}
}
}
val reduceRet = mapDelta.reduceByKey(_+_)
val OUTPUT_PATH = s"./dump"
reduceRet.saveAsTextFile(OUTPUT_PATH)
我在火花驱动程序中添加了另一个Java选项“ -Xss32m”,以提高每个线程的堆栈内存,并且此异常不再抛出。我多么愚蠢,我应该早点尝试一下。但是显示了另一个问题,我将不得不检查更多问题。仍然非常感谢您的帮助。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句