我需要在每个map()中读取一个不同的文件,该文件在HDFS中
val rdd=sc.parallelize(1 to 10000)
val rdd2=rdd.map{x=>
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://ITS-Hadoop10:9000/"), new org.apache.hadoop.conf.Configuration())
val path=new Path("/user/zhc/"+x+"/")
val t=hdfs.listStatus(path)
val in =hdfs.open(t(0).getPath)
val reader = new BufferedReader(new InputStreamReader(in))
var l=reader.readLine()
}
rdd2.count
我的问题是这段代码
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://ITS-Hadoop10:9000/"), new org.apache.hadoop.conf.Configuration())
需要太多的运行时间,每次map()都需要创建一个新的FileSystem值。我可以将这段代码放在map()函数之外,这样就不必每次都创建hdfs了吗?或者如何在map()中快速读取文件?
我的代码在多台机器上运行。谢谢!
在您的情况下,我建议使用wholeTextFiles
wich方法将返回pairRdd,其键是文件的完整路径,而值是字符串中文件的内容。
val filesPariRDD = sc.wholeTextFiles("hdfs://ITS-Hadoop10:9000/")
val filesLineCount = filesPariRDD.map( x => (x._1, x._2.length ) ) //this will return a map of fileName , number of lines of each file. You could apply any other function on the file contents
filesLineCount.collect()
编辑
如果文件位于同一目录下的目录中(如注释中所述),则可以使用某种正则表达式
val filesPariRDD = sc.wholeTextFiles("hdfs://ITS-Hadoop10:9000/*/")
希望这是清楚和有用的
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句