导入和使用Google Cloud软件包

蒂斯

我创建了一个管道,它吐出了一个数字列表。这些数字流入ParDo,在ParDo中,我使用数字查询Bigquery表,然后返回查询结果。

这在本地有效。Linux,Python 3.7,google-cloud-bigquery 1.22.0

当我将作业上传到数据流时,事情变得很有趣。我在顶层执行的所有操作均不适用于以下功能。因此,我必须在每个函数中导​​入所有用过的软件包,以便可用。

这太丑了,我怀疑我做错了什么。但是呢

所以我得到这样的功能:

def flatten(elements):
    import datetime
    for element in elements['transactionids']:
        print('flatten: ' + str(element) + ' ' + datetime.datetime.now().isoformat())
        yield element

我得到了这样的“ DoFn类”:

class TransformTransaction(beam.DoFn):
    def setup(self):
        print("This will never run. Why?")

    def start_bundle(self):
        print("Bundle Start")
        from google.cloud import bigquery
        self.client = bigquery.Client()
        self.dataset_id = 'mydataset'
        self.table_id = 'testhijs'
        self.table_ref = self.client.dataset(self.dataset_id).table(self.table_id)
        self.table = self.client.get_table(self.table_ref)

   def retrieveTransactionData(self, transactionID):
        query = f"select transactionID, someNr from `thijs-dev.thijsset.thijstable` " \
                f"where transactionID = {transactionID}"

    query_job = self.client.query(
        query,
        location="EU",
    )  
    print(query_job)

    transactions = []

    for row in query_job:
        transactions.append(row)

    return transactions
马内西奥兹

使用管道配置--save_main_session这将导致全局名称空间的状态被腌制并加载到Cloud Dataflow工作程序上。

Python中的完整示例:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse

def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
          '--runner=DataflowRunner',
          '--project=proj',
          '--region=region',
          '--staging_location=gs://bucket/staging/',
          '--temp_location=gs://bucket/temp/',
          '--job_name=name',
          '--setup_file=./setup.py'
          ]) 

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True #this is what you need to include 

编辑:链接到文档

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Go,Google Cloud Appengine本地软件包

来自分类Dev

Google Cloud Function无法安装python软件包

来自分类Dev

React Native:无法导入和使用外部软件包

来自分类Dev

使用Webpacker和Rails导入npm软件包的图像

来自分类Dev

React Native:无法导入和使用外部软件包

来自分类Dev

使用NAMESPACE导入/导出软件包

来自分类Dev

如何使用导入的软件包(JsonRpc)

来自分类Dev

SparkR和软件包

来自分类Dev

SparkR和软件包

来自分类Dev

使用Go软件包

来自分类Dev

使用npm软件包

来自分类Dev

导入numpy和Scipy软件包的模块/子软件包的差异

来自分类Dev

列出精确地使用R代码导入给定软件包的软件包?

来自分类Dev

软件包中的python导入模块

来自分类Dev

无法导入已安装的软件包

来自分类Dev

无法从jsqlparser库导入软件包

来自分类Dev

导入不带Java名称的软件包

来自分类Dev

如何正确导入软件包

来自分类Dev

PyDev是从站点软件包中的软件包而不是开发软件包中导入的(绝对导入)

来自分类Dev

Google App Engine:无法导入Go软件包

来自分类Dev

自动导入的软件包的顺序和歧义

来自分类Dev

Django升级,导入错误和软件包名称冲突

来自分类Dev

ES6导入和npm软件包

来自分类Dev

如何在Google Cloud Composer中安装PYPI软件包(Facebook-business,Google-cloud-secret-manager)

来自分类Dev

导入未使用的软件包是否不好?

来自分类Dev

如何使用py.test正确导入软件包?

来自分类Dev

使用导入的自定义软件包加载iPython

来自分类Dev

无法导入使用pip安装的软件包

来自分类Dev

如何使用py.test正确导入软件包?