Amazon S3から大きなサイズのJSONファイルを読み取る際にread()メソッドを使用する場合のMemoryError

ジェラルドミラノ

Pythonを使用してAmazonS3からAWSRDS-PostgreSQLに大きなサイズのJSONファイルをインポートしようとしています。しかし、これらのエラーが発生しました、

トレースバック(最後の最後の呼び出し):

ファイル "my_code.py"、67行目、

file_content = obj ['Body']。read()。decode( 'utf-8')。splitlines(True)

ファイル「/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/response.py」、76行目、読み取り中

チャンク= self._raw_stream.read(amt)

ファイル "/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/response.py"、行239 、読んで

data = self._fp.read()

ファイル "/usr/lib64/python3.6/http/client.py"、行462、読み取り中

s = self._safe_read(self.length)

_safe_read内のファイル "/usr/lib64/python3.6/http/client.py"、行617

b ""。join(s)を返します

MemoryError

// my_code.py

import sys
import boto3
import psycopg2
import zipfile
import io
import json

s3 = boto3.client('s3', aws_access_key_id=<aws_access_key_id>, aws_secret_access_key=<aws_secret_access_key>)
connection = psycopg2.connect(host=<host>, dbname=<dbname>, user=<user>, password=<password>)
cursor = connection.cursor()

bucket = sys.argv[1]
key = sys.argv[2]
obj = s3.get_object(Bucket=bucket, Key=key)

def insert_query(data):
    query = """
        INSERT INTO data_table
        SELECT
            (src.test->>'url')::varchar, (src.test->>'id')::bigint,
            (src.test->>'external_id')::bigint, (src.test->>'via')::jsonb
        FROM (SELECT CAST(%s AS JSONB) AS test) src
    """
    cursor.execute(query, (json.dumps(data),))


if key.endswith('.zip'):
    zip_files = obj['Body'].read()
    with io.BytesIO(zip_files) as zf:
        zf.seek(0)
        with zipfile.ZipFile(zf, mode='r') as z:
            for filename in z.namelist():
                with z.open(filename) as f:
                    for line in f:
                        insert_query(json.loads(line.decode('utf-8')))
if key.endswith('.json'):
    file_content = obj['Body'].read().decode('utf-8').splitlines(True)
    for line in file_content:
        insert_query(json.loads(line))


connection.commit()
connection.close()

これらの問題に対する解決策はありますか?どんな助けでもいいでしょう、どうもありがとう!

ShadowRanger

入力ファイル全体listを行としてメモリに丸呑みすることを回避することで、大幅な節約が可能になります

具体的には、これらの行は、bytesファイル全体のサイズのオブジェクトのピークメモリ使用量に加えてlist、ファイルの完全な内容を含む行のピークを伴うという点で、メモリ使用量に関してひどいものです。

file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:

500万行の1GB ASCIIテキストファイルの場合、64ビットPython 3.3以降では、オブジェクト、、、およびの個々のsだけで約2.3GBのピークメモリ要件になります。処理するファイルのサイズの2.3倍のRAMを必要とするプログラムは、大きなファイルに拡張できません。bytesliststrlist

修正するには、元のコードを次のように変更します。

file_content = io.TextIOWrapper(obj['Body'], encoding='utf-8')
for line in file_content:

obj['Body']レイジーストリーミングに使用できるように見える場合、これにより、ファイルデータ全体の両方のコピーがメモリから削除されます。TextIOWrapper手段の使用obj['Body']、(一度に数KBの)チャンクで遅延読み取りおよびデコードされ、行も遅延反復されます。これにより、ファイルサイズに関係なく、メモリ需要が小さく、ほぼ固定された量に削減されます(ピークメモリコストは最長の行の長さに依存します)。

更新:

ABCをStreamingBody実装していないようio.BufferedIOBaseです。それは持っていない、独自の文書化されたAPIを同様の目的のために使用することができること、けれども。あなたがあなたのTextIOWrapperために仕事をすることができないなら(それが仕事をすることができればそれははるかに効率的で簡単です)、別の方法はすることです:

file_content = (line.decode('utf-8') for line in obj['Body'].iter_lines())
for line in file_content:

を使用するTextIOWrapper場合とは異なり、ブロックの一括デコード(各行は個別にデコードされます)のメリットはありませんが、それ以外の場合は、メモリ使用量の削減という点で同じメリットが得られるはずです。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

Related 関連記事

ホットタグ

アーカイブ