编辑:我用启用接收器实验性选项的beam.io.WriteToBigQuery来工作。我实际上使用了它,但是我的问题是我试图从包装在str()中的两个变量(数据集+表)“构建”完整的表引用。这是将整个值提供程序参数数据作为字符串,而不是调用get()方法仅获取值。
我正在尝试生成一个数据流模板,然后从GCP云函数调用。(作为参考,我的数据流作业应该读取其中包含一堆文件名的文件,然后从GCS读取所有文件并将其写入BQ )。因此,我需要以某种方式编写它,以便可以使用运行时值提供程序传递BigQuery数据集/表。
目前,我文章的底部是我的代码,其中省略了一些与问题无关的内容。请特别注意BQ_flexible_writer(beam.DoFn)-这是我试图“定制” beam.io.WriteToBigQuery的地方,以便它接受运行时值提供程序。
我的模板生成良好,当我在不提供运行时变量的情况下测试运行管道时(依靠默认值),它成功了,并且在控制台中查看作业时,我看到添加的行。但是,在检查BigQuery时没有数据(三重检查日志中的数据集/表名称是否正确)。不知道它去了哪里或者我可以添加什么日志以了解元素发生了什么?
有什么想法吗?或有关如何使用运行时变量写入BigQuery的建议?我什至可以以我将其包含在DoFn中的方式来调用beam.io.WriteToBigQuery,还是必须将beam.io.WriteToBigQuery后面的实际代码用作代码并使用它?
#=========================================================
class BQ_flexible_writer(beam.DoFn):
def __init__(self, dataset, table):
self.dataset = dataset
self.table = table
def process(self, element):
dataset_res = self.dataset.get()
table_res = self.table.get()
logging.info('Writing to table: {}.{}'.format(dataset_res,table_res))
beam.io.WriteToBigQuery(
#dataset= runtime_options.dataset,
table = str(dataset_res) + '.' + str(table_res),
schema = SCHEMA_ADFImpression,
project = str(PROJECT_ID), #options.display_data()['project'],
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, #'CREATE_IF_NEEDED',#create if does not exist.
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND #'WRITE_APPEND' #add to existing rows,partitoning
)
# https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#valueprovider
class FileIterator(beam.DoFn):
def __init__(self, files_bucket):
self.files_bucket = files_bucket
def process(self, element):
files = pd.read_csv(str(element), header=None).values[0].tolist()
bucket = self.files_bucket.get()
files = [str(bucket) + '/' + file for file in files]
logging.info('Files list is: {}'.format(files))
return files
# https://stackoverflow.com/questions/58240058/ways-of-using-value-provider-parameter-in-python-apache-beam
class OutputValueProviderFn(beam.DoFn):
def __init__(self, vp):
self.vp = vp
def process(self, unused_elm):
yield self.vp.get()
class RuntimeOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--dataset',
default='EDITED FOR PRIVACY',
help='BQ dataset to write to',
type=str)
parser.add_value_provider_argument(
'--table',
default='EDITED FOR PRIVACY',
required=False,
help='BQ table to write to',
type=str)
parser.add_value_provider_argument(
'--filename',
default='EDITED FOR PRIVACY',
help='Filename of batch file',
type=str)
parser.add_value_provider_argument(
'--batch_bucket',
default='EDITED FOR PRIVACY',
help='Bucket for batch file',
type=str)
#parser.add_value_provider_argument(
# '--bq_schema',
#default='gs://dataflow-samples/shakespeare/kinglear.txt',
# help='Schema to specify for BQ')
#parser.add_value_provider_argument(
# '--schema_list',
#default='gs://dataflow-samples/shakespeare/kinglear.txt',
# help='Schema in list for processing')
parser.add_value_provider_argument(
'--files_bucket',
default='EDITED FOR PRIVACY',
help='Bucket where the raw files are',
type=str)
parser.add_value_provider_argument(
'--complete_batch',
default='EDITED FOR PRIVACY',
help='Bucket where the raw files are',
type=str)
#=========================================================
def run():
#====================================
# TODO PUT AS PARAMETERS
#====================================
JOB_NAME_READING = 'adf-reading'
JOB_NAME_PROCESSING = 'adf-'
job_name = '{}-batch--{}'.format(JOB_NAME_PROCESSING,_millis())
pipeline_options_batch = PipelineOptions()
runtime_options = pipeline_options_batch.view_as(RuntimeOptions)
setup_options = pipeline_options_batch.view_as(SetupOptions)
setup_options.setup_file = './setup.py'
google_cloud_options = pipeline_options_batch.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = job_name
google_cloud_options.region = 'europe-west1'
google_cloud_options.staging_location = GCS_STAGING_LOCATION
google_cloud_options.temp_location = GCS_TMP_LOCATION
#pipeline_options_batch.view_as(StandardOptions).runner = 'DirectRunner'
# # If datflow runner [BEGIN]
pipeline_options_batch.view_as(StandardOptions).runner = 'DataflowRunner'
pipeline_options_batch.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'
#pipeline_options_batch.view_as(WorkerOptions).machine_type = 'n1-standard-96' #'n1-highmem-32' #'
pipeline_options_batch.view_as(WorkerOptions).max_num_workers = 10
# [END]
pipeline_options_batch.view_as(SetupOptions).save_main_session = True
#Needed this in order to pass table to BQ at runtime
pipeline_options_batch.view_as(DebugOptions).experiments = ['use_beam_bq_sink']
with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:
try:
final_data = (
pipeline_2
|'Create empty PCollection' >> beam.Create([None])
|'Get accepted batch file 1/2:{}'.format(OutputValueProviderFn(runtime_options.complete_batch)) >> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
|'Get accepted batch file 2/2:{}'.format(OutputValueProviderFn(runtime_options.complete_batch)) >> beam.ParDo(FileIterator(runtime_options.files_bucket))
|'Read all files' >> beam.io.ReadAllFromText(skip_header_lines=1)
|'Process all files' >> beam.ParDo(ProcessCSV(),COLUMNS_SCHEMA_0)
|'Format all files' >> beam.ParDo(AdfDict())
#|'WriteToBigQuery_{}'.format('test'+str(_millis())) >> beam.io.WriteToBigQuery(
# #dataset= runtime_options.dataset,
# table = str(runtime_options.dataset) + '.' + str(runtime_options.table),
# schema = SCHEMA_ADFImpression,
# project = pipeline_options_batch.view_as(GoogleCloudOptions).project, #options.display_data()['project'],
# create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, #'CREATE_IF_NEEDED',#create if does not exist.
# write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND #'WRITE_APPEND' #add to existing rows,partitoning
# )
|'WriteToBigQuery' >> beam.ParDo(BQ_flexible_writer(runtime_options.dataset,runtime_options.table))
)
except Exception as exception:
logging.error(exception)
pass
请使用以下附加选项运行它。
--experiment=use_beam_bq_sink
没有此功能,Dataflow当前将使用不支持ValueProviders的本机版本覆盖BigQuery接收器。
此外,请注意,不支持将数据集设置为运行时参数。尝试将表参数指定为整个表引用(DATASET.TABLE或PROJECT:DATASET.TABLE)。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句