pyspark rdd拆分问题

RSH

我正在尝试从rdd过滤,其值为“ 01-10-2019”

print("\n ### Remove duplicates in merged RDD:")

insuredata = insuredatamerged_cache.distinct()
print("insuredata: ",type(insuredata))

print("\n  ### Increase partition to 8 in merged RDD:")
insuredata.getNumPartitions()
insuredatarepart = insuredata.repartition(8)
insuredatarepart.getNumPartitions()

print("insuredatarepart:",type(insuredatarepart))

print("\n ### Split RDD with business date field:")

rdd_201901001 = insuredatarepart.map(lambda y: y.split(",",-1)).filter(lambda x: u'01-10-2019' in x)

print(" ### count of rdd_201901001:",rdd_201901001.count())

输入值:

其中insuredatarepart是类'pyspark.rdd.RDD',下面的数据集为列表值

Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='96601', IssuerId2='96601', MarketCoverage=u'SHOP (Small Group)', NetworkName=u'Select Network', NetworkURL=u'http://il.coventryproviders.com', SourceName=u'SERFF', StateCode=u'IL', custnum='13')Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'Yes', IssuerId='37001', IssuerId2='37001', MarketCoverage=u'Individual', NetworkName=u'HumanaDental PPO/Traditional Preferred', NetworkURL=u'https://www.humana.com/finder/search?customerId=1085&pfpkey=317', SourceName=u'HIOS', StateCode=u'GA', custnum='13')
    Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='54172', IssuerId2='54172', MarketCoverage=u'Individual', NetworkName=u'Molina Marketplace', NetworkURL=u'https://eportal.molinahealthcare.com/Provider/ProviderSearch?RedirectFrom=MolinaStaticWeb&State=fl&Coverage=MMP', SourceName=u'HIOS', StateCode=u'FL', custnum='14')

异常如下图所示:

### Remove duplicates in merged RDD:
insuredata:  class 'pyspark.rdd.PipelinedRDD'
 Result Count after duplicates removed:  1407
 Result Count of duplicates removed:  1

### Increase partition to 8 in merged RDD:
insuredatarepart: class 'pyspark.rdd.RDD'

### Split RDD with business date field:
20/02/05 19:11:43 ERROR Executor: Exception in task 0.0 in stage 74.0 (TID 150)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1008, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1008, in <genexpr>
  File "/home/hduser/sparkdata2/script/insurance_info2_new.py", line 294, in <lambda>
    rdd_201901001 = insuredatarepart.map(lambda y: y.split(",",-1)).filter(lambda x: u'01-10-2019' in x)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1502, in __getattr__
    raise AttributeError(item)
AttributeError: split

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
        at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
        at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
xenodevil

从提供的打印输出中,您似乎具有ROD类型的RDD。

Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='96601', IssuerId2='96601', MarketCoverage=u'SHOP (Small Group)', NetworkName=u'Select Network', NetworkURL=u'http://il.coventryproviders.com', SourceName=u'SERFF', StateCode=u'IL', custnum='13')Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'Yes', IssuerId='37001', IssuerId2='37001', MarketCoverage=u'Individual', NetworkName=u'HumanaDental PPO/Traditional Preferred', NetworkURL=u'https://www.humana.com/finder/search?customerId=1085&pfpkey=317', SourceName=u'HIOS', StateCode=u'GA', custnum='13')
Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='54172', IssuerId2='54172', MarketCoverage=u'Individual', NetworkName=u'Molina Marketplace', NetworkURL=u'https://eportal.molinahealthcare.com/Provider/ProviderSearch?RedirectFrom=MolinaStaticWeb&State=fl&Coverage=MMP', SourceName=u'HIOS', StateCode=u'FL', custnum='14')

在这里,您不必调用split函数来拆分元素,因为通过用于获取这些元素的任何过程,它们似乎已经被拆分为多个字段。您可以通过项目索引访问。

rdd_201901001 = insuredatarepart.filter(lambda x: u'01-10-2019' in x[0])

请注意,已删除地图,并在filter子句中将index添加为 in x[0]

如果您的行中只有一个字符串类型字段(基于共享输出,则没有);您仍然需要在zeroeth元素上而不是在Row本身上调用split,并且该语句可能已经

rdd_201901001 = insuredatarepart.map(lambda y: y[0].split(",",-1)).filter(lambda x: u'01-10-2019' in x[0])

请注意,索引值已在mapfilter操作中应用。这将导致您需要缝合在一起的字符串列表的RDD。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章