我正在使用spark从目录及其子目录中的单个RDD中读取HDFS中所有文件的数据。我找不到任何有效的方法来做到这一点。因此,我尝试编写一些自定义代码,如下所示:
public Object fetch(String source,String sink) {
//reading data
boolean isDir=new File(source).isDirectory();
System.out.println("isDir="+isDir);
JavaRDD<String> lines;
if(isDir)
{
lines=readFiles(new File(source).listFiles(), null);
}
else
lines= sc.textFile(source);
lines.saveAsTextFile(sink);
return true;
}
public static JavaRDD<String> readFiles(File[] files,JavaRDD<String> lines) {
for (File file : files) {
if (file.isDirectory()) {
readFiles(file.listFiles(),lines); // Calls same method again.
}
else {
if(lines==null)
lines=sc.textFile(file.getPath());
else
{
JavaRDD<String> r=sc.textFile(file.getPath());
lines.union(r);
}
}
}
return lines;
}
但这没有完成我的预期工作,因为isDir包含错误信息,表明它不是目录。请您指导我怎么了?有没有有效的方法来完成这项工作?非常感谢
由于spark可以基于Hadoop Job配置读取数据,因此可以使用该FileInputFormat#setInputDirRecursive
方法。
JavaSparkContext context = new JavaSparkContext();
Job job;
try {
job = Job.getInstance();
FileInputFormat.setInputPaths(job, new Path("/path/to/input/directory));
FileInputFormat.setInputDirRecursive(job, true);
} catch (IOException e1) {
e1.printStackTrace();
System.exit(1);
}
JavaRDD<Text> sourceData = context.newAPIHadoopRDD(job.getConfiguration(), TextInputFormat.class, LongWritable.class, Text.class)
.values();
显然,您最终将获得文本数据类型而不是字符串。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句