在Spark Java API中以递归方式从HDFS读取所有文件

哈菲兹·穆贾迪德(Hafiz Mujadid)

我正在使用spark从目录及其子目录中的单个RDD中读取HDFS中所有文件的数据。我找不到任何有效的方法来做到这一点。因此,我尝试编写一些自定义代码,如下所示:

public Object fetch(String source,String sink) {

    //reading data
    boolean isDir=new File(source).isDirectory();
    System.out.println("isDir="+isDir);
    JavaRDD<String> lines;
    if(isDir)
    {

        lines=readFiles(new File(source).listFiles(), null);
    }
    else
        lines= sc.textFile(source);

    lines.saveAsTextFile(sink);
    return true;
}

public static JavaRDD<String> readFiles(File[] files,JavaRDD<String> lines) {
    for (File file : files) {
        if (file.isDirectory()) {
            readFiles(file.listFiles(),lines); // Calls same method again.
        } 
        else {
            if(lines==null)
                lines=sc.textFile(file.getPath());
            else
            {
                JavaRDD<String> r=sc.textFile(file.getPath());
                lines.union(r);
            }
        }
    }
    return lines;
}

但这没有完成我的预期工作,因为isDir包含错误信息,表明它不是目录。请您指导我怎么了?有没有有效的方法来完成这项工作?非常感谢

强大的力量

由于spark可以基于Hadoop Job配置读取数据,因此可以使用该FileInputFormat#setInputDirRecursive方法。

JavaSparkContext context = new JavaSparkContext();

Job job;

try {
  job = Job.getInstance();
  FileInputFormat.setInputPaths(job, new Path("/path/to/input/directory));
  FileInputFormat.setInputDirRecursive(job, true);
} catch (IOException e1) {
  e1.printStackTrace();
  System.exit(1);
}

JavaRDD<Text> sourceData = context.newAPIHadoopRDD(job.getConfiguration(), TextInputFormat.class, LongWritable.class, Text.class)
  .values();

显然,您最终将获得文本数据类型而不是字符串。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何使Spark会话递归读取所有文件?

来自分类Dev

Spark,Java中的DataFrame转换

来自分类Dev

Apache Spark中的Java要求

来自分类Dev

Apache Spark中的Java要求

来自分类Dev

Spark仅在HDFS中读取

来自分类常见问题

Java中带有Spark文件流的检查点

来自分类Dev

如何以递归方式获取 Spark DataFrame 中的所有列

来自分类Dev

如何使用Spark快速从map()中的HDFS中读取文件

来自分类Dev

如何使用Spark快速从map()中的HDFS中读取文件

来自分类Dev

Spark流无法读取从HDFS中的水槽创建的文件

来自分类Dev

在Spark中从HDFS或S3读取边缘DB文件

来自分类Dev

如何在Spark中读取HDFS序列文件

来自分类Dev

在 Scala/Spark 中从 HDFS 读取文本文件

来自分类Dev

在Java中使用Apache Spark读取TSV文件的最佳方法

来自分类Dev

在Spark Java中提供静态文件

来自分类Dev

如何使用Java Spark下载文件?

来自分类Dev

从Yaml文件加载Java Spark配置

来自分类Dev

如何使用Java Spark提供CSV文件?

来自分类Dev

Spark Datastax Java API Select语句

来自分类Dev

在Spark Java中的Cookie中设置域

来自分类Dev

如何使用Java从Spark中的kafka读取流嵌套的JSON

来自分类Dev

Java的Spark MLlib中的矩阵运算

来自分类Dev

Spark中的java.lang.NoSuchMethodError

来自分类Dev

Spark DataFrame.flatMap在Java中的用法

来自分类Dev

在Spark Scala中引用Java嵌套类

来自分类Dev

spark作业中的静态变量-Java

来自分类Dev

Apache Spark MLlib:Java中的OLS回归

来自分类Dev

Java中的Spark sql选择和减少

来自分类Dev

无法使用Spark从HDFS读取文件