如何使用Apache Beam中的运行时值提供程序写入Big Query?

乔治·S

编辑:我用启用接收器实验性选项的beam.io.WriteToBigQuery来工作。我实际上使用了它,但是我的问题是我试图从包装在str()中的两个变量(数据集+表)“构建”完整的表引用。这是将整个值提供程序参数数据作为字符串,而不是调用get()方法仅获取值。

我正在尝试生成一个数据流模板,然后从GCP云函数调用。(作为参考,我的数据流作业应该读取其中包含一堆文件名的文件,然后从GCS读取所有文件并将其写入BQ )。因此,我需要以某种方式编写它,以便可以使用运行时值提供程序传递BigQuery数据集/表。

目前,我文章的底部是我的代码,其中省略了一些与问题无关的内容。请特别注意BQ_flexible_writer(beam.DoFn)-这是我试图“定制” beam.io.WriteToBigQuery的地方,以便它接受运行时值提供程序。

我的模板生成良好,当我在不提供运行时变量的情况下测试运行管道时(依靠默认值),它成功了,并且在控制台中查看作业时,我看到添加的行。但是,在检查BigQuery时没有数据(三重检查日志中的数据集/表名称是否正确)。不知道它去了哪里或者我可以添加什么日志以了解元素发生了什么?

有什么想法吗?或有关如何使用运行时变量写入BigQuery的建议?我什至可以以我将其包含在DoFn中的方式来调用beam.io.WriteToBigQuery,还是必须将beam.io.WriteToBigQuery后面的实际代码用作代码并使用它?我的“成功”工作DAG,显示了元素由我的自定义BQ编写者进行操作

#=========================================================

class BQ_flexible_writer(beam.DoFn):
    def __init__(self, dataset, table):
        self.dataset = dataset
        self.table = table

    def process(self, element):
        dataset_res = self.dataset.get()
        table_res = self.table.get()
        logging.info('Writing to table: {}.{}'.format(dataset_res,table_res))
        beam.io.WriteToBigQuery(
        #dataset= runtime_options.dataset,
        table = str(dataset_res) + '.' + str(table_res), 
        schema = SCHEMA_ADFImpression,
        project = str(PROJECT_ID), #options.display_data()['project'],
        create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  #'CREATE_IF_NEEDED',#create if does not exist.
        write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND    #'WRITE_APPEND' #add to existing rows,partitoning
        )
# https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#valueprovider
class FileIterator(beam.DoFn):
    def __init__(self, files_bucket):
        self.files_bucket = files_bucket

    def process(self, element):
        files = pd.read_csv(str(element), header=None).values[0].tolist()
        bucket = self.files_bucket.get()
        files = [str(bucket) + '/' + file for file in files]
        logging.info('Files list is: {}'.format(files))
        return files

# https://stackoverflow.com/questions/58240058/ways-of-using-value-provider-parameter-in-python-apache-beam   
class OutputValueProviderFn(beam.DoFn):
    def __init__(self, vp):
        self.vp = vp

    def process(self, unused_elm):
        yield self.vp.get()


class RuntimeOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):

        parser.add_value_provider_argument(
          '--dataset',
          default='EDITED FOR PRIVACY',
          help='BQ dataset to write to',
          type=str)

        parser.add_value_provider_argument(
          '--table',
          default='EDITED FOR PRIVACY',
          required=False,
          help='BQ table to write to',
          type=str)

        parser.add_value_provider_argument(
          '--filename',
          default='EDITED FOR PRIVACY',
          help='Filename of batch file',
          type=str)

        parser.add_value_provider_argument(
          '--batch_bucket',
          default='EDITED FOR PRIVACY',
          help='Bucket for batch file',
          type=str)

        #parser.add_value_provider_argument(
        #   '--bq_schema',
          #default='gs://dataflow-samples/shakespeare/kinglear.txt',
        #  help='Schema to specify for BQ')

        #parser.add_value_provider_argument(
        #   '--schema_list',
          #default='gs://dataflow-samples/shakespeare/kinglear.txt',
        #  help='Schema in list for processing')

        parser.add_value_provider_argument(
          '--files_bucket',
          default='EDITED FOR PRIVACY',
          help='Bucket where the raw files are',
          type=str)

        parser.add_value_provider_argument(
          '--complete_batch',
          default='EDITED FOR PRIVACY',
          help='Bucket where the raw files are',
          type=str)
#=========================================================

def run():
    #====================================
    # TODO PUT AS PARAMETERS 
    #====================================
    JOB_NAME_READING = 'adf-reading'
    JOB_NAME_PROCESSING = 'adf-'

    job_name = '{}-batch--{}'.format(JOB_NAME_PROCESSING,_millis())

    pipeline_options_batch = PipelineOptions()

    runtime_options = pipeline_options_batch.view_as(RuntimeOptions)

    setup_options = pipeline_options_batch.view_as(SetupOptions)
    setup_options.setup_file  = './setup.py'
    google_cloud_options = pipeline_options_batch.view_as(GoogleCloudOptions)
    google_cloud_options.project = PROJECT_ID
    google_cloud_options.job_name = job_name
    google_cloud_options.region = 'europe-west1'
    google_cloud_options.staging_location = GCS_STAGING_LOCATION
    google_cloud_options.temp_location = GCS_TMP_LOCATION


    #pipeline_options_batch.view_as(StandardOptions).runner = 'DirectRunner'

    # # If datflow runner [BEGIN]
    pipeline_options_batch.view_as(StandardOptions).runner = 'DataflowRunner'
    pipeline_options_batch.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'

    #pipeline_options_batch.view_as(WorkerOptions).machine_type = 'n1-standard-96' #'n1-highmem-32' #' 
    pipeline_options_batch.view_as(WorkerOptions).max_num_workers = 10
    #  [END]

    pipeline_options_batch.view_as(SetupOptions).save_main_session = True
    #Needed this in order to pass table to BQ at runtime
    pipeline_options_batch.view_as(DebugOptions).experiments = ['use_beam_bq_sink']


    with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:

        try:

            final_data = (
            pipeline_2
            |'Create empty PCollection' >> beam.Create([None])
            |'Get accepted batch file 1/2:{}'.format(OutputValueProviderFn(runtime_options.complete_batch)) >> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
            |'Get accepted batch file 2/2:{}'.format(OutputValueProviderFn(runtime_options.complete_batch)) >> beam.ParDo(FileIterator(runtime_options.files_bucket))
            |'Read all files' >> beam.io.ReadAllFromText(skip_header_lines=1)
            |'Process all files' >> beam.ParDo(ProcessCSV(),COLUMNS_SCHEMA_0)
            |'Format all files' >> beam.ParDo(AdfDict())
            #|'WriteToBigQuery_{}'.format('test'+str(_millis())) >> beam.io.WriteToBigQuery(
            #        #dataset= runtime_options.dataset,
            #        table = str(runtime_options.dataset) + '.' + str(runtime_options.table), 
            #        schema = SCHEMA_ADFImpression,
            #        project = pipeline_options_batch.view_as(GoogleCloudOptions).project, #options.display_data()['project'],
            #        create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  #'CREATE_IF_NEEDED',#create if does not exist.
            #        write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND    #'WRITE_APPEND' #add to existing rows,partitoning
            #        )
            |'WriteToBigQuery' >> beam.ParDo(BQ_flexible_writer(runtime_options.dataset,runtime_options.table))
            )
        except Exception as exception:
            logging.error(exception)
            pass
Chamikara

请使用以下附加选项运行它。

--experiment=use_beam_bq_sink

没有此功能,Dataflow当前将使用不支持ValueProviders的本机版本覆盖BigQuery接收器。

此外,请注意,不支持将数据集设置为运行时参数。尝试将表参数指定为整个表引用(DATASET.TABLE或PROJECT:DATASET.TABLE)。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

在运行时在数据流python中向'beam.io.BigQuerySource'提供'query'参数

来自分类Dev

如何从Scala中的运行时值创建部分函数

来自分类Dev

如何从运行时值转换为模板参数?

来自分类Dev

使用具有运行时值的过程从表中执行数据验证规则

来自分类Dev

如何使用 xamarin.forms 在 android 中设置外部存储写入运行时权限

来自分类Dev

使用泛型进行 Swift 重构:如何从泛型类型暗示运行时值?

来自分类Dev

当程序在Tkinter中运行时,如何制作新标签?

来自分类Dev

程序在Python中运行时如何清除shell?

来自分类Dev

使用 Dataflow Runner 运行时 Beam Sql 失败

来自分类Dev

如何在运行时为独立应用程序提供可用的 jar?

来自分类Dev

如何评估程序的运行时?

来自分类Dev

程序运行时继续使用bash

来自分类Dev

程序运行时继续使用bash

来自分类Dev

如何在C#中写入运行时生成的.dll文件?

来自分类Dev

当脚本在tmux中运行时使用python写入传感器读取

来自分类Dev

如何装饰依赖于运行时值进行创建的类

来自分类Dev

当依赖项需要运行时值时,如何注入依赖项?

来自分类Dev

使用 DI 的 ASP.NET CORE 2.0 运行时值

来自分类Dev

使用运行时值对宏进行字符串化

来自分类Dev

在nohup模式下运行时,如何为脚本中的read命令提供值。前任。在体内

来自分类Dev

如何解决运行时错误“您必须提供layout_width属性。” 在android中?

来自分类Dev

在 Delphi 中运行时如何运行命令行程序并发送密钥?

来自分类Dev

如何在运行时从BizTalk写入SSO

来自分类Dev

使用Tkinter在运行时写入文本小部件

来自分类Dev

如何使用SparkRunner重新调整Apache Beam

来自分类Dev

如何使用 Apache Beam 模型导航树

来自分类Dev

我如何使用 Apache Beam 实验功能

来自分类Dev

Apache Storm应用程序在运行时失败

来自分类Dev

如何在运行时从python提供Java输入?

Related 相关文章

  1. 1

    在运行时在数据流python中向'beam.io.BigQuerySource'提供'query'参数

  2. 2

    如何从Scala中的运行时值创建部分函数

  3. 3

    如何从运行时值转换为模板参数?

  4. 4

    使用具有运行时值的过程从表中执行数据验证规则

  5. 5

    如何使用 xamarin.forms 在 android 中设置外部存储写入运行时权限

  6. 6

    使用泛型进行 Swift 重构:如何从泛型类型暗示运行时值?

  7. 7

    当程序在Tkinter中运行时,如何制作新标签?

  8. 8

    程序在Python中运行时如何清除shell?

  9. 9

    使用 Dataflow Runner 运行时 Beam Sql 失败

  10. 10

    如何在运行时为独立应用程序提供可用的 jar?

  11. 11

    如何评估程序的运行时?

  12. 12

    程序运行时继续使用bash

  13. 13

    程序运行时继续使用bash

  14. 14

    如何在C#中写入运行时生成的.dll文件?

  15. 15

    当脚本在tmux中运行时使用python写入传感器读取

  16. 16

    如何装饰依赖于运行时值进行创建的类

  17. 17

    当依赖项需要运行时值时,如何注入依赖项?

  18. 18

    使用 DI 的 ASP.NET CORE 2.0 运行时值

  19. 19

    使用运行时值对宏进行字符串化

  20. 20

    在nohup模式下运行时,如何为脚本中的read命令提供值。前任。在体内

  21. 21

    如何解决运行时错误“您必须提供layout_width属性。” 在android中?

  22. 22

    在 Delphi 中运行时如何运行命令行程序并发送密钥?

  23. 23

    如何在运行时从BizTalk写入SSO

  24. 24

    使用Tkinter在运行时写入文本小部件

  25. 25

    如何使用SparkRunner重新调整Apache Beam

  26. 26

    如何使用 Apache Beam 模型导航树

  27. 27

    我如何使用 Apache Beam 实验功能

  28. 28

    Apache Storm应用程序在运行时失败

  29. 29

    如何在运行时从python提供Java输入?

热门标签

归档