sc.textFile(path)允许读取HDFS文件,但不接受参数(例如,跳过许多行,has_headers等)。
在“学习Spark” O'Reilly电子书中,建议使用以下函数读取CSV(示例5-12。Python加载CSV示例)
import csv
import StringIO
def loadRecord(line):
"""Parse a CSV line"""
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
我的问题是关于如何对“所取”的行进行选择性处理:
我在这里看到一些不错的解决方案:选择元素范围,但我想看看是否还有其他更简单的方法。
谢谢!
不必担心加载不需要的行/行。当您这样做时:
input = sc.textFile(inputFile)
您没有加载文件。您只是得到一个对象,该对象将允许您对文件进行操作。因此,要提高效率,最好只考虑获得想要的东西。例如:
header = input.take(1)[0]
rows = input.filter(lambda line: line != header)
请注意,这里我没有使用索引来引用要删除的行,而是引用了它的值。这样做的副作用是,其他具有该值的行也将被忽略,但更具有Spark精神,因为Spark会将文本文件分布在节点的不同部分,而行号的概念在每个分区中都会丢失。这也是为什么在Spark(Hadoop)中不容易做到这一点的原因,因为每个分区都应被视为独立的,并且全局行号会破坏此假设。
如果您确实需要使用行号,我建议您将它们添加到Spark之外的文件中(请参阅此处),然后仅在Spark内部按此列进行过滤。
编辑:添加zipWithIndex
了@Daniel Darabos建议的解决方案。
sc.textFile('test.txt')\
.zipWithIndex()\ # [(u'First', 0), (u'Second', 1), ...
.filter(lambda x: x[1]!=5)\ # select columns
.map(lambda x: x[0])\ # [u'First', u'Second'
.collect()
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句