我制作了一个函数,每5〜6秒将.CSV数据插入到BigQuery中。我一直在寻找避免插入后在BigQuery中重复数据的方法。我想删除具有相同luid的数据,但是我不知道如何删除它,因此可以在插入之前检查BigQuery表中是否已经存在.CSV的每个数据。我放置了row_ids参数以避免重复的luid,但是看来效果不佳。你能给我个主意吗?谢谢。
def stream_upload():
# BigQuery
client = bigquery.Client()
project_id = 'test'
dataset_name = 'test'
table_name = "test"
full_table_name = dataset_name + '.' + table_name
json_rows = []
with open('./test.csv','r') as f:
for line in csv.DictReader(f):
del line[None]
line_json = dict(line)
json_rows.append(line_json)
errors = client.insert_rows_json(
full_table_name,json_rows,row_ids=[row['luid'] for row in json_rows]
)
if errors == []:
print("New rows have been added.")
else:
print("Encountered errors while inserting rows: {}".format(errors))
print("end")
schedule.every(0.5).seconds.do(stream_upload)
while True:
schedule.run_pending()
time.sleep(0.1)
BigQuery没有本机处理此问题的方法。您可以根据该表创建一个视图以执行重复数据删除,也可以创建表的外部缓存luids
并进行查找,如果它们在写入之前已经被写入BigQuery并在写入新数据后更新了缓存。这可以像文件缓存一样简单,也可以使用其他数据库。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句