Pythonを使用してAmazonS3からAWSRDS-PostgreSQLに大きなサイズのJSONファイルをインポートしようとしています。しかし、これらのエラーが発生しました、
トレースバック(最後の最後の呼び出し):
ファイル "my_code.py"、67行目、
file_content = obj ['Body']。read()。decode( 'utf-8')。splitlines(True)
ファイル「/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/response.py」、76行目、読み取り中
チャンク= self._raw_stream.read(amt)
ファイル "/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/response.py"、行239 、読んで
data = self._fp.read()
ファイル "/usr/lib64/python3.6/http/client.py"、行462、読み取り中
s = self._safe_read(self.length)
_safe_read内のファイル "/usr/lib64/python3.6/http/client.py"、行617
b ""。join(s)を返します
MemoryError
// my_code.py
import sys
import boto3
import psycopg2
import zipfile
import io
import json
s3 = boto3.client('s3', aws_access_key_id=<aws_access_key_id>, aws_secret_access_key=<aws_secret_access_key>)
connection = psycopg2.connect(host=<host>, dbname=<dbname>, user=<user>, password=<password>)
cursor = connection.cursor()
bucket = sys.argv[1]
key = sys.argv[2]
obj = s3.get_object(Bucket=bucket, Key=key)
def insert_query(data):
query = """
INSERT INTO data_table
SELECT
(src.test->>'url')::varchar, (src.test->>'id')::bigint,
(src.test->>'external_id')::bigint, (src.test->>'via')::jsonb
FROM (SELECT CAST(%s AS JSONB) AS test) src
"""
cursor.execute(query, (json.dumps(data),))
if key.endswith('.zip'):
zip_files = obj['Body'].read()
with io.BytesIO(zip_files) as zf:
zf.seek(0)
with zipfile.ZipFile(zf, mode='r') as z:
for filename in z.namelist():
with z.open(filename) as f:
for line in f:
insert_query(json.loads(line.decode('utf-8')))
if key.endswith('.json'):
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:
insert_query(json.loads(line))
connection.commit()
connection.close()
これらの問題に対する解決策はありますか?どんな助けでもいいでしょう、どうもありがとう!
入力ファイル全体list
を行としてメモリに丸呑みすることを回避することで、大幅な節約が可能になります。
具体的には、これらの行は、bytes
ファイル全体のサイズのオブジェクトのピークメモリ使用量に加えてlist
、ファイルの完全な内容を含む行のピークを伴うという点で、メモリ使用量に関してひどいものです。
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:
500万行の1GB ASCIIテキストファイルの場合、64ビットPython 3.3以降では、オブジェクト、、、およびの個々のsだけで約2.3GBのピークメモリ要件になります。処理するファイルのサイズの2.3倍のRAMを必要とするプログラムは、大きなファイルに拡張できません。bytes
list
str
list
修正するには、元のコードを次のように変更します。
file_content = io.TextIOWrapper(obj['Body'], encoding='utf-8')
for line in file_content:
obj['Body']
レイジーストリーミングに使用できるように見える場合、これにより、ファイルデータ全体の両方のコピーがメモリから削除されます。TextIOWrapper
手段の使用はobj['Body']
、(一度に数KBの)チャンクで遅延読み取りおよびデコードされ、行も遅延反復されます。これにより、ファイルサイズに関係なく、メモリ需要が小さく、ほぼ固定された量に削減されます(ピークメモリコストは最長の行の長さに依存します)。
更新:
ABCをStreamingBody
実装していないようio.BufferedIOBase
です。それは持っていない、独自の文書化されたAPIを同様の目的のために使用することができること、けれども。あなたがあなたのTextIOWrapper
ために仕事をすることができないなら(それが仕事をすることができればそれははるかに効率的で簡単です)、別の方法はすることです:
file_content = (line.decode('utf-8') for line in obj['Body'].iter_lines())
for line in file_content:
を使用するTextIOWrapper
場合とは異なり、ブロックの一括デコード(各行は個別にデコードされます)のメリットはありませんが、それ以外の場合は、メモリ使用量の削減という点で同じメリットが得られるはずです。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加