How to run dynamic second query in google cloud dataflow?

Eric H

I'm attempting to do an operation wherein I get a list of Ids via a query, transform them into a string separated by commas (i.e. "1,2,3") and then use it in a secondary query. When attempting to run the second query, I'm given a syntax error:

"Target type of a lambda conversion must be an interface"

String query = "SELECT DISTINCT campaignId FROM `" + options.getEligibilityInputTable() + "` ";

    Pipeline p = Pipeline.create(options);
    p.apply("GetCampaignIds", BigQueryIO.readTableRows().withTemplateCompatibility().fromQuery(query).usingStandardSql())
      .apply("TransformCampaignIds",
        MapElements.into(TypeDescriptors.strings())
        .via((TableRow row) -> (String)row.get("campaignId")))
      .apply(Combine.globally(new StringToCsvCombineFn()))
      .apply("GetAllCampaigns", campaignIds -> BigQueryIO.readTableRows().withTemplateCompatibility().fromQuery("SELECT id AS campaignId, dataQuery FROM `{projectid}.mysql_standard.campaigns` WHERE campaignId IN (" + campaignIds + ")").usingStandardSql())
....

How can I chain queries together?

Pablo

You cannot do this with the existing sources, unfortunately. Your options here are two:

  • You manually call the BQ API from a ParDo.
  • You write a complex SQL query that does this for you.

The second option looks something like so:

String query = "SELECT id AS campaignId, dataQuery \
               FROM `{projectid}.mysql_standard.campaigns` \
               WHERE campaignId IN ( \
                   SELECT DISTINCT campaignId \
                   FROM `" + options.getEligibilityInputTable() 
                   + "`)";

Pipeline p = Pipeline.create(options);
p.apply("GetAllCampaigns", BigQueryIO.readTableRows()
                                     .withTemplateCompatibility()
                                     .fromQuery(query)
                                     .usingStandardSql());

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

How do I write compressed files to Google Cloud Storage using Google Cloud Dataflow?

分類Dev

How to schedule a docker run on google cloud

分類Dev

easiest way to schedule a Google Cloud Dataflow job

分類Dev

ImportError phonenumbers with google cloud dataflow python

分類Dev

Kafka to Google Cloud Platform Dataflow ingestion

分類Dev

Explain Cost of Google Cloud PubSub when used with Cloud Dataflow

分類Dev

Google DataflowからGoogle Cloud Firestoreへの出力

分類Dev

How to run C++ files on the Google Cloud Functions?

分類Dev

How to use a Google Secret in a deployed Cloud Run Service (managed)?

分類Dev

Google Cloud Dataflow-Java SDKとPython SDK

分類Dev

Google Cloud Dataflow Python、ジョブIDの取得

分類Dev

Google Cloud Run Container Networking

分類Dev

How to specify a disk type for workers in Google Dataflow?

分類Dev

How to run query on with in laravel

分類Dev

google-cloud-run quarkus Keyloack

分類Dev

How to run docker in docker in Container-optimized OS on Compute Engine VM on Google Cloud?

分類Dev

How can I see request count (not rate) for a Google Cloud Run application?

分類Dev

How to build a Dynamic Query in Cassandra?

分類Dev

How to write this postgresql dynamic query

分類Dev

Google Cloud Dataflow:ジョブを強制終了できません

分類Dev

Google Cloud Dataflowは米国東部地域で利用できますか?

分類Dev

Why Google Cloud Dataflow doesn't scale to target workers with autoscaling enabled and no quota limitations

分類Dev

Google Cloud Dataflow ETL(データストア->変換-> BigQuery)

分類Dev

PCollectionView<List<Foo>> のアクセス要素: Google Cloud Dataflow/Apache Beam

分類Dev

apache-beam / google-cloud-dataflowで並べ替えと制限

分類Dev

How to do a "keys-only-query" in Google Cloud Datastore (Node.js)

分類Dev

Google DataFlow Apache Beam

分類Dev

Google Dataflow - Scheduling

分類Dev

Google Cloud Run / Google StorageBucketのマウント

Related 関連記事

  1. 1

    How do I write compressed files to Google Cloud Storage using Google Cloud Dataflow?

  2. 2

    How to schedule a docker run on google cloud

  3. 3

    easiest way to schedule a Google Cloud Dataflow job

  4. 4

    ImportError phonenumbers with google cloud dataflow python

  5. 5

    Kafka to Google Cloud Platform Dataflow ingestion

  6. 6

    Explain Cost of Google Cloud PubSub when used with Cloud Dataflow

  7. 7

    Google DataflowからGoogle Cloud Firestoreへの出力

  8. 8

    How to run C++ files on the Google Cloud Functions?

  9. 9

    How to use a Google Secret in a deployed Cloud Run Service (managed)?

  10. 10

    Google Cloud Dataflow-Java SDKとPython SDK

  11. 11

    Google Cloud Dataflow Python、ジョブIDの取得

  12. 12

    Google Cloud Run Container Networking

  13. 13

    How to specify a disk type for workers in Google Dataflow?

  14. 14

    How to run query on with in laravel

  15. 15

    google-cloud-run quarkus Keyloack

  16. 16

    How to run docker in docker in Container-optimized OS on Compute Engine VM on Google Cloud?

  17. 17

    How can I see request count (not rate) for a Google Cloud Run application?

  18. 18

    How to build a Dynamic Query in Cassandra?

  19. 19

    How to write this postgresql dynamic query

  20. 20

    Google Cloud Dataflow:ジョブを強制終了できません

  21. 21

    Google Cloud Dataflowは米国東部地域で利用できますか?

  22. 22

    Why Google Cloud Dataflow doesn't scale to target workers with autoscaling enabled and no quota limitations

  23. 23

    Google Cloud Dataflow ETL(データストア->変換-> BigQuery)

  24. 24

    PCollectionView<List<Foo>> のアクセス要素: Google Cloud Dataflow/Apache Beam

  25. 25

    apache-beam / google-cloud-dataflowで並べ替えと制限

  26. 26

    How to do a "keys-only-query" in Google Cloud Datastore (Node.js)

  27. 27

    Google DataFlow Apache Beam

  28. 28

    Google Dataflow - Scheduling

  29. 29

    Google Cloud Run / Google StorageBucketのマウント

ホットタグ

アーカイブ