Python 및 Boto3를 사용하여 데이터를 눈송이로로드하기 위해 S3 버킷에서 선택한 파일을 눈송이 단계로 변환하는 방법

요한 네랑가

s3 버킷에있는 파일을 준비해야합니다. 먼저 주어진 버킷에 업로드 할 최신 파일을 찾은 다음 해당 파일을 전체 버킷이 아닌 stage. 예를 들어 주제라는 버킷이 있다고 가정 해 보겠습니다. 그 안에 두 개의 폴더 topic1과 topic2가 있습니다. 그 2 개의 폴더는 2 개의 파일을 새로 업로드했습니다.이 경우에는 그 데이터를 snowflake에로드하기 위해 새로 업로드 한 파일을 스테이지로 만들어야합니다. 나는 이미 최신 파일을 찾기 위해 코드를 작성했지만 스테이지 로 만드는 방법을 모르겠습니다. 각 파일에 대해 for 루프와 함께 CREATE OR REPLACE STAGE 명령을 사용하면 마지막 파일에 대한 스테이지 만 생성됩니다. 각 파일에 대한 스테이지를 생성하지 않습니다. 어떻게해야합니까?

`def download_s3_files (self) :

s3_object = boto3.client('s3', aws_access_key_id=self.s3_acc_key, aws_secret_access_key=self.s3_sec_key)

    if self.source_as_stage:

        no_of_dir = []

        try:
            bucket = s3_object.list_objects(Bucket=self.s3_bucket, Prefix=self.file_path, Delimiter='/')
            print("object bucket list >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>", bucket)
        except Exception as e:
            self.propagate_log_msg('check [%s] and Source File Location Path' % e)

        for directory in bucket['CommonPrefixes']:
            no_of_dir.append(str(directory['Prefix']).rsplit('/', 2)[-2])

        print(no_of_dir)

        no_of_dir.sort(reverse=True)
        latest_dir = no_of_dir[0]

        self.convert_source_as_stage(latest_dir)

    except Exception as e:
        print(e)
        exit(-1)

def convert_source_as_stage (self, latest_file) :

    source_file_format = str(self.metadata['source_file_format']).lower()+'_format' if self.metadata['source_file_format'] is not None else 'pipe_format'
    url = 's3://{bucket}/{location}/{dir_}'.format(location=self.s3_file_loc.strip("/"),
                                                   bucket=self.s3_bucket, dir_=latest_file)
    print("formateed url>>>>>>>>>>>>>>>>>>", url)
    file_name_dw = str(latest_file.rsplit('/', 1)[-1])

    print("File_Name>>>>>>>>>>>>>", file_name_dw)
    print("Source file format :", source_file_format)
    print("source url: ", url)

    self.create_stage = """

                  CREATE OR REPLACE STAGE {sa}.{table} URL='{url}'
                  CREDENTIALS=(AWS_KEY_ID='{access_key}' AWS_SECRET_KEY='{secret}')
                  FILE_FORMAT = {file};
                  // create or replace stage {sa}.{table}
                  //   file_format = (type = 'csv' field_delimiter = '|' record_delimiter = '\\n');

                  """.format(sa=self.ss_cd, table=self.table.lower(), access_key=self.s3_acc_key, secret=self.s3_sec_key,
                             url=url, file=source_file_format, filename=str(self.metadata['source_table']))

    """"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
            '''CONNECT TO SNOWFLAKE''''''''''''''''''''''''''''''''''
    """""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

    print("Create Stage Statement :", self.create_stage)

    con = snowflake.connector.connect(
        user=self.USER,
        password=self.PASSWORD,
        account=self.ACCOUNT,
    )

    self.propagate_log_msg("Env metadata = [%s]" % self.env_metadata)

    """"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
    '''REFRESH DDL''''''''''''''''''''''''''''''''''
    """""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
    try:

        file_format_full_path = os.path.join(self.root, 'sql', str(source_file_format)+'.sql')
        self.create_file_format = open(file_format_full_path, 'r').read()

        self.create_schema = "CREATE schema if not exists {db_lz}.{sa}".format(sa=self.ss_cd, db_lz=self.db_lz)

        env_sql = 'USE database {db_lz}'.format(db_lz=self.db_lz)
        self.propagate_log_msg(env_sql)
        con.cursor().execute(env_sql)
        con.cursor().execute(self.create_schema)

        env_sql = 'USE schema {schema}'.format(schema=self.ss_cd)
        self.propagate_log_msg(env_sql)
        con.cursor().execute(env_sql)

        con.cursor().execute(self.create_file_format)
        con.cursor().execute(self.create_stage)

    except snowflake.connector.ProgrammingError as e:
        self.propagate_log_msg('Invalid sql, fix sql and retry')
        self.propagate_log_msg(e)
        exit()
    except KeyError:
        self.propagate_log_msg(traceback.format_exc())
        self.propagate_log_msg('deploy_ods is not set in schedule metadata, assuming it is False')
    except Exception as e:
        self.propagate_log_msg('unhandled exception, debug')
        self.propagate_log_msg(traceback.format_exc())
        exit()
    else:
        self.propagate_log_msg(
            "Successfully dropped and recreated table/stage for [{sa}.{table}]".format(sa=self.ss_cd,
                                                                                  table=self.table))`
한스 헨릭 에릭 센

아마도 당신은 한 발 물러서서 당신이 성취하려는 것에 대해 더 큰 그림을 줄 수있을 것입니다. 그것은 좋은 조언을하기 위해 다른 사람들을 도울 것입니다.

모범 사례는 전체 버킷에 대해 하나의 Snowflake 를 만드는 STAGE 입니다. STAGE거울 다음 객체 버킷 객체입니다. 예를 들어 설정이 필요한 경우. 버킷의 다른 부분에 대해 다른 권한을 부여한 경우 다른 액세스 권한으로 여러 단계를 만드는 것이 합리적 일 수 있습니다.

단계를 설정하는 목적은 S3 객체를 Snowflake 테이블로 가져 오는 것 같습니다. 이 작업은 COPY INTO <table>명령 으로 수행되며 해당 명령에는 가져올 개체 / 파일 이름을 선택하는 두 가지 옵션이 있습니다.

  1. FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] )
  2. PATTERN = '<regex_pattern>'

데이터베이스에 COPY INTO <table>과도한 양의 STAGE개체 를 만드는 대신 매개 변수에 노력을 기울이는 것이 좋습니다 .

Snowpipes에 대해서도 진지하게 살펴 봐야 합니다. Snowpipes는 S3 COPY INTO <table>에 의해 트리거되는 명령 사용 하여 거의 실시간으로 S3 객체를 Snowflake 테이블로 가져 오는 작업을 수행합니다 . 개체 이벤트를 만듭니다. Snowpipe는 전용 리소스가 아니기 때문에 창고보다 비용이 저렴합니다.
간단하고 효과적입니다.

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

Related 관련 기사

뜨겁다태그

보관