csvファイルから大きなクエリにデータを作成して追加し、Pythonを使用してテーブルを分割するにはどうすればよいですか?

Utsav Chatterjee

Google CloudStorageでcsvgzipファイルを圧縮し、Pythonを使用して、スキーマを自動検出し、命名規則に従ってGoogleBigQueryで新しいテーブルを作成しています。作成中のテーブルをパーティション分割するにはどうすればよいですか?使用したいデータにすでに日付列があります。

# importing libraries
from google.cloud import bigquery

# defining first load list
first_load_list = []

#defining tracker file
tracker_file = open("tracker_file", "a")

#reading values from config file
config_file = open("ingestion.config", "r")
for line in config_file:
    if "project_id" in line:
        project_id = line.split("=")[1].strip()
    elif "dataset" in line:
        dataset = line.split("=")[1].strip()
    elif "gcs_location" in line:
        gcs_location = line.split("=")[1].strip()
    elif "bq1_target_table" in line:
        bq1_target_table = line.split("=")[1].strip()
    elif "bq2_target_table" in line:
        bq2_target_table = line.split("=")[1].strip()
    elif "bq1_first_load_filename" in line:
        bq1_first_load_filename = line.split("=")[1].strip()
        first_load_list.append(bq1_first_load_filename)
    elif "bq2_first_load_filename" in line:
        bq2_first_load_filename = line.split("=")[1].strip()
        first_load_list.append(bq2_first_load_filename)
    elif "gcs_bucket" in line:
        gcs_bucket = line.split("=")[1].strip()

# reading bucket list temp file
bucket_list_file = open("bucket_list.temp", "r")
bucket_list = []
for entry in bucket_list_file:
    bucket_list.append(entry)


# defining client and specifying project
client = bigquery.Client(project_id)
dataset_id = dataset
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV


# leading files into tables based on naming convention
for filename in first_load_list:
    if "BQ2_2" in filename:
        uri = gcs_location + filename
        print "Processing file = " + uri
        load_job = client.load_table_from_uri(
            uri.strip(),
            dataset_ref.table(bq2_target_table),
            job_config=job_config)  # API request
        assert load_job.job_type == 'load'
        load_job.result()  # Waits for table load to complete.
        assert load_job.state == 'DONE'
        assert client.get_table(dataset_ref.table(bq2_target_table))
        tracker_file.write(filename + "\n")
        print filename.strip() + " processing complete\n"
    elif "BQ1_2" in filename:
        uri = gcs_location + filename
        print "Processing file = " + uri
        load_job = client.load_table_from_uri(
            uri.strip(),
            dataset_ref.table(bq1_target_table),
            job_config=job_config)  # API request
        assert load_job.job_type == 'load'
        load_job.result()  # Waits for table load to complete.
        assert load_job.state == 'DONE'
        assert client.get_table(dataset_ref.table(bq1_target_table))
        tracker_file.write(filename + "\n")
        print filename.strip() + " processing complete\n"

tracker_file.close()

これは、最初のロードで実行するコードです。最初のロードテーブルが作成されたら、今後はこれらのテーブルにデータを追加するだけです。https://cloud.google.com/bigquery/docs/creating-partitioned-tablesを見ましたが、Pythonで実装する方法がわかりません。

誰かが私を正しい方向に向けるのを手伝ってもらえますか?

テディ

job_config._properties['load']['timePartitioning'] = {"type":"DAY", 'field':'your_field'}ロード時にパーティションテーブルを作成するために使用できます。テストデータを使用してテストしたところ、期待どおりに機能しました。

APIを使用したパーティションは、'DAY'現時点ではのみサポートされていることに注意してください

GitHubの問題を参照してください

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

Related 関連記事

ホットタグ

アーカイブ