我想filter
删除RDD中字段“状态”不等于“确定”的元素。我从HDFS上的一组CSV文件中创建了RDD,然后map
在尝试执行filter
以下操作之前获取所需的结构:
import csv, StringIO
files = "/hdfs_path/*.csv"
fields = ["time", "status"]
dial = "excel"
default = {'status': 'OK', 'time': '2014-01-01 00:00:00'}
def loadRecord(line, fieldnames, dialect):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames = fieldnames, dialect = dialect)
try:
line = reader.next()
if line is None:
return default
else:
return line
except:
return default
harmonics = sc.textFile(files) \
.map(lambda x: loadRecord(x, fields, dial)) \
.filter(lambda x: "OK" not in x['status'])
我可以对该RDD执行其他操作-例如map
,get
仅对某些字段执行其他操作,等等。但是,当我使用时运行我的代码时filter
,其中一个任务总是会失败,但filter
lambda函数会出现异常:
'NoneType object is not iterable'
我认为这意味着filter
lambda正在接收None
,因此我添加了代码loadRecord
以避免返回None
。但是,我仍然遇到相同的错误。它确实适用于较小的样本数据集,但是我的实际数据足够大,我不确定如何检测其中的哪一部分可能引起问题。
任何输入表示赞赏!
首先,map(lambda x: loadRecord(x, fields, dial))
使用map(lambda x: (x, loadRecord(x, fields, dial)))
-保存原始记录和已解析的记录。
其次,将filter()
call替换为flatMap(test_function)
并定义test_function
测试输入的方式,如果第二个传递的参数为None(已解析的记录),它将返回第一个参数。
这样,您将获得导致问题的输入行,并在本地对其进行测试。通常,我会添加一行global default
作为loadRecord
函数的第一行
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句