我有如下的火花作业代码。在集群上使用以下配置可以正常工作。
String path = "/tmp/one.txt";
JavaRDD<SomeClass> jRDD = spark.read()
.textFile(path)
.javaRDD()
.map(line -> {
return new SomeClass(line);
});
Dataset responseSet = sparkSession.createDataFrame(jRDD, SomeClass.class);
responseSet.write()
.format("text")
.save(path + "processed");
然而,如果我想读取二进制文件(与文本大小相同),则需要更多时间。
String path = "/tmp/one.txt";
JavaRDD<SomeClass> jRDD = sparkContext
.binaryRecords(path, 10000, new Configuration())
.toJavaRDD()
.map(line -> {
return new SomeClass(line);
});
Dataset responseSet = spark.createDataFrame(jRDD, SomeClass.class);
responseSet.write()
.format("text")
.save(path + "processed");
下面是我的配置。
driver-memory 8g
executor-memory 6g
num-executors 16
具有 150 MB 文件的第一个代码所花费的时间为 1.30 分钟。具有 150 MB 文件的第二个代码所花费的时间为 4 分钟。
此外,第一个代码能够在所有 16 个执行程序上运行,而第二个代码仅使用一个。
任何建议为什么它很慢?
我发现了这个问题。该textFile()
方法正在创建16 partitions
(您可以在 RDD 上检查numOfPartitions
usinggetNumPartitions()
方法)而binaryRecords()
仅创建 1 个(Java binaryRecords 不提供指定要创建的分区数的重载方法)。
我增加numOfPartitions
了通过在 RDD 上binaryRecords()
使用repartition(NUM_OF_PARTITIONS)
方法创建的RDD。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句