我正在尝试使用Beam将csv文件中的数据从GCS传输到BQ,但是在调用WriteToBigQuery时出现NoneType错误。错误信息:
AttributeError: 'NoneType' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
我的管道代码:
import apache_beam as beam
from apache_beam.pipeline import PipelineOptions
from apache_beam.io.textio import ReadFromText
options = {
'project': project,
'region': region,
'temp_location': bucket
'staging_location': bucket
'setup_file': './setup.py'
}
class Split(beam.DoFn):
def process(self, element):
n, cc = element.split(",")
return [{
'n': int(n.strip('"')),
'connection_country': str(cc.strip()),
}]
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(options=pipeline_options) as pipeline:
(pipeline
| 'Read from GCS' >> ReadFromText('file_path*', skip_header_lines=1)
| 'parse input' >> beam.ParDo(Split())
| 'print' >> beam.Map(print)
| 'Write to BQ' >> beam.io.WriteToBigQuery(
'from_gcs', 'demo', schema='n:INTEGER, connection_country:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
)
我的csv看起来像这样:
在print()阶段的光束摘录如下所示:
感谢任何帮助!
由于该print
函数不返回任何内容,因此没有错误,因此没有任何元素进入该WriteToBQ
步骤。您可以使用以下方法修复它:
def print_fn(element):
print(element)
return element
{..}
| 'print' >> beam.Map(print_fn) # Note that now I'm referencing to the fn
| 'Write to BQ' >> beam.io.WriteToBigQuery(
{..}
另外,如果您要在Dataflow中运行此命令,print
则不会出现,但可以使用logging.info()
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句