Airflow S3 Hook 또는 boto3를 사용하는 csv.gzip 인 디렉터리에서 여러 파일을 읽는 방법은 무엇입니까?

스윙의 술탄

S3에 다음 s3://test-bucket/test-folder/2020-08-28/과 같은 파일 이있는 디렉토리 가 있습니다.

2020-08-28 03:29:13   29397684 data_0_0_0.csv.gz
2020-08-28 03:29:13   29000150 data_0_1_0.csv.gz
2020-08-28 03:29:13   38999956 data_0_2_0.csv.gz
2020-08-28 03:29:13   32079942 data_0_3_0.csv.gz
2020-08-28 03:29:13   34154791 data_0_4_0.csv.gz
2020-08-28 03:29:13   45348128 data_0_5_0.csv.gz
2020-08-28 03:29:13   60904419 data_0_6_0.csv.gz

이 파일의 내용을 어딘가에 덤프 하는 S3 후크 ( https://airflow.readthedocs.io/en/stable/_modules/airflow/hooks/S3_hook.html )를 사용하여 Airflow 연산자를 만들려고합니다 . 나는 시도했다 :

contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/),
contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/data_0_0_0.csv)
contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/data_0_0_0.csv.gz)

이들 중 어느 것도 작동하지 않는 것 같습니다. 나는 거기에 있음을 s3.select_key알았지 만 올바른 매개 변수가없는 것 같으며 입력 및 출력 직렬화 만 있습니다. 파일 자체에 아무것도하지 않고 S3 후크를 사용하여이 데이터를 가져올 수있는 방법이 있습니까?

내 다음 문제는 폴더 내에 많은 파일이 있다는 것 s3://test-bucket/test-folder/2020-08-28/입니다. 나는 사용해 list_keys보았지만 버킷 이름이 마음에 들지 않습니다.

keys = s3.list_keys('s3://test-bucket/test-folder/2020-08-28/')

준다

Invalid bucket name "s3://test-bucket/test-folder/2020-08-28/": Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$"

나는 또한 같은 것을 시도했지만 "s3 : //"를 제거했습니다. 어느 시점에서도 인증 오류가 발생하지 않습니다. 내가 넣어 때 .csv.gz에서 read_key위의 호출, 그것은 나에게 말한다

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte

내가 gzip으로 압축된다는 사실과 관련이 있다고 가정하고 있습니까?

그렇다면 어떻게 1. 압축 된 csv 파일 인 S3에서 키를 읽을 수 있고 2. 주어진 디렉토리 내에서 모든 csv 파일을 한 번에 어떻게 읽을 수 있습니까?

미구엘 트레 호

.NET과 같은 디렉토리에서 파일을 읽는다고 가정합니다 s3://your_bucket/your_directory/YEAR-MONTH-DAY/. 그런 다음 두 가지를 수행 할 수 있습니다.

  • 데이터에 대한 읽기 경로 . .csv.gz각 하위 디렉토리 파일 경로를 읽습니다.

  • 데이터로드 . 이 예제에서 우리는 그것들을로로드 할 것입니다 pandas.DataFrame. 그러나 당신은 그것을 gzip Object로 남겨 둘 수 있습니다.

1. A Airflow S3 후크로 경로 읽기

# Initialize the s3 hook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3_hook = S3Hook()

# Read the keys from s3 bucket
paths = s3_hook.list_keys(bucket_name='your_bucket_name', prefix='your_directory')

여기서 키를 나열하려면 뒤에 페이지 지정자를 사용하고 있습니다. 여기에서 경로 목록에서 읽기위한 세 번째 형식에 도달합니다.

1.B 페이지 네이터로 경로 읽기

예를 들어 페이지 지정자의 경우 s3_//your_bucket/your_directory/item.csv.gz, ... 등 의 객체를 나열하려면 페이지 지정자가 다음과 같이 작동합니다 ( 문서 에서 가져온 예 )

client = boto3.client('s3', region_name='us-west-2')
paginator = client.get_paginator('list_objects')
operation_parameters = {'Bucket': 'your_bucket',
                        'Prefix': 'your_directory'}
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
    print(page['Contents'])

그러면 Key읽을 경로 목록을 얻기 위해 각 사전의을 필터링 할 수있는 사전 목록이 출력됩니다 . 즉, 페이지 매기기가 다음과 같은 것을 던집니다.

[{'Key': 'your_directoyr/file_1.csv.gz
....},
..., 
{'Key': 'your_directoyr/file_n.csv.gz
....}

이제이 작업을 수행하는 세 번째 형식에 도달합니다. 이는 이전과 유사합니다.

1.C Boto 3 클라이언트로 경로 읽기

경로를 읽으려면 다음 기능을 고려하십시오.

import boto3 

s3_client = boto3.client('s3')

def get_all_s3_objects(s3_client, **base_kwargs):
    continuation_token = None
    while True:
        list_kwargs = dict(MaxKeys=1000, **base_kwargs)
        if continuation_token:
            list_kwargs['ContinuationToken'] = continuation_token
        response = s3_client.list_objects_v2(**list_kwargs)
        yield from response.get('Contents', [])
        if not response.get('IsTruncated'):  # At the end of the list?
            break
        continuation_token = response.get('NextContinuationToken')

예를 들어 접미사 키와 버킷 이름을 사용하여이 함수를 호출 할 때

files = get_all_s3_objects(s3_client, Bucket='your_bucket_name', Prefix=f'your_directory/YEAR-MONTH-DAY')
paths = [f['Key'] for f in files]

경로를 호출하면 .csv.gz파일 목록이 표시됩니다 . 귀하의 경우, 이것은

[data_0_0_0.csv.gz,
data_0_1_0.csv.gz,
data_0_2_0.csv.gz]

그런 다음이를 다음 함수의 입력으로 사용하여 데이터를 pandas 데이터 프레임으로 읽을 수 있습니다.

2. 데이터로드

기능 고려

from io import BytesIO
import pandas as pd

def load_csv_gzip(s3_client, bucket, key):
    with BytesIO() as f:
        s3_files = s3_client.download_fileobj(Bucket=bucket,
                           Key=key,
                           Fileobj=f)
        f.seek(0)
        gzip_fd = gzip.GzipFile(fileobj=f)
        return pd.read_csv(gzip_fd)

마지막으로 .csv.gz파일 목록을 제공하고 각 경로를 반복적으로로드하고 결과를 pandas 데이터 프레임에 연결하거나 단일 .csv.gz파일 만로드 할 수 있습니다 . 예를 들면

data = pd.concat([load_csv_gzip(s3_client, 'your_bucket', path) for p in paths])

경로의 각 요소는 your_subdirectory/2020-08-28/your_file.csv.gz.

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

Related 관련 기사

뜨겁다태그

보관