我是新手,Spark API
并在此过程中学习。
我在 hadoop 目录中有多个文件,我正在使用 WholeTextFiles 读取这些文件以创建JavaPairRDD<String, String>
. 程序是用Java编写的。
我的要求是处理目录中的文件列表并实现以下输出:
文件路径,字
文件路径,字
文件路径,字
...
这基本上是具有对应文件名(或路径)的文件的单词内容配对为<String, String>
.
我尝试了以下但不允许从 tuple2 转换为 Iterable(在运行时失败):
JavaSparkContext sc = new JavaSparkContext(new SparkConf()) ;
JavaPairRDD<String, String> files = sc.wholeTextFiles(args[0]);
JavaRDD<Tuple2<String, String>> file_word = files
.flatMap(new FlatMapFunction<Tuple2<String,String>, Tuple2<String,String>>()
{
public Iterable<Tuple2<String, String>> call(Tuple2<String, String> tuple)
{
return (Iterable<Tuple2<String, String>>) new Tuple2<String, Iterable<String>>(tuple._1(), Arrays.asList(((String) tuple._2()).toLowerCase ().split("\\W+")));
}
});
我正在使用Java 8
,Hadoop2
与Spark 2.2.0
.
(通过查看这里的其他问题,我可以理解在 Scala 中编写这个更容易,但是我没有找到与 Java 相关的答案)
寻找解决方案。谢谢你。
据我所知,您正试图将 Tuple2 转换为 Iterable,但它无法工作。
由于您使用的是 java8,您可以使用 lambda 表达式编写它,这将使事情变得更加紧凑:
JavaPairRDD<String, String> rdd = sc
.wholeTextFiles("path_to_data/*")
.flatMapValues(x -> Arrays.asList(x.split("\\W+")));
请注意,我使用的是flatMapValues
而不是flatMap
因为您只需要处理元组的第二个值。
如果你很好奇,flatmap
你可以通过将文件的每个单词映射到一个元组 (fileName, word) 来完成它:
JavaRDD<Tuple2<String, String>> rdd2 = sc
.wholeTextFiles("path_to_data/*")
.flatMap(x -> Arrays.asList(x._2.split("\\n"))
.stream()
.map(w -> new Tuple2<>(x._1, w))
.iterator());
flatMapValues
只是让你用更少的代码做到这一点;-)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句