바이너리 파일을 사용하여 병렬 Pyspark에서 RDD를 실행하지 않는 Spark

역 추적

저는 Spark의 초보자이며 Python으로 스크립트를 작성하기 시작했습니다. 내 이해는 Spark가 변환을 병렬 (맵)로 실행한다는 것입니다.

def some_function(name, content):
    print(name, datetime.now())
    time.sleep(30)
    return content

config = SparkConf().setAppName("sample2").setMaster("local[*]")
filesRDD = SparkContext(conf=config).binaryFiles("F:\\usr\\temp\\*.zip")
inputfileRDD = filesRDD.map(lambda job_bundle: (job_bundle[0], some_function(job_bundle[0], job_bundle[1])))
print(inputfileRDD.collect())

위의 코드는 .zip폴더에서 파일 목록을 수집 하여 처리합니다. 내가 그것을 실행할 때 나는 이것이 순차적으로 일어나는 것을보고 있습니다.

산출

file:/F:/usr/temp/sample.zip 2020-10-22 10:42:37.089085
file:/F:/usr/temp/sample1.zip 2020-10-22 10:43:07.103317

30 초 후에 두 번째 파일을 처리하기 시작한 것을 볼 수 있습니다. 첫 번째 파일을 완성한 후 의미합니다. 내 코드에서 무엇이 잘못 되었습니까? RDD를 병렬로 실행하지 않는 이유는 무엇입니까? 제발 날 좀 도와 줄 수 있니 ?

그것은

방법이 binaryFiles스파크 파티션을 가로 질러 파일을 분할하는 방법을 정확히 모르겠습니다 . 반대로 textFiles, 하나의 파티션 만 만드는 경향 이있는 것 같습니다 . dir5 개의 파일이 포함 된 라는 예제 디렉토리를 사용하여 살펴 보겠습니다 .

> ls dir
test1  test2  test3  test4  test5

을 사용 textFile하면 일이 병렬로 실행됩니다. 너무 예쁘지 않기 때문에 출력을 제공하지 않지만 직접 확인할 수 있습니다. .NET과 병렬로 실행되는지 확인할 수 있습니다 getNumPartitions.

>>> sc.textFile("dir").foreach(lambda x: some_function(x, None))
# ugly output, but everything starts at the same time,
# except maybe the last one since you have 4 cores.
>>> sc.textFile("dir").getNumPartitions()
5

함께 binaryFiles가지 다른 어떤 이유로 모든 것이 동일한 파티션에 간다.

>>> sc.binaryFiles("dir").getNumPartitions()
1

나는 심지어 10k 파일로 시도했지만 모든 것이 여전히 동일한 파티션으로 이동합니다. 그 이유는 scala에서 binaryFiles파일 이름과 파일을 읽을 수있는 객체 (그러나 읽기가 수행되지 않음)가있는 RDD를 반환하기 때문이라고 생각합니다 . 따라서 속도가 빠르고 결과 RDD가 작습니다. 따라서 하나의 파티션에있는 것이 좋습니다. 따라서 스칼라에서는 사용 후 다시 파티션을 사용할 수 binaryFiles있으며 모든 것이 잘 작동합니다.

scala> sc.binaryFiles("dir").getNumPartitions
1
scala> sc.binaryFiles("dir").repartition(4).getNumPartitions
4
scala> sc.binaryFiles("dir").repartition(4)
    .foreach{ case (name, ds) => { 
        println(System.currentTimeMillis+": "+name)
        Thread.sleep(2000)
        // do some reading on the DataStream ds
    }}
1603352918396: file:/home/oanicol/sandbox/dir/test1
1603352918396: file:/home/oanicol/sandbox/dir/test3
1603352918396: file:/home/oanicol/sandbox/dir/test4
1603352918396: file:/home/oanicol/sandbox/dir/test5
1603352920397: file:/home/oanicol/sandbox/dir/test2

파이썬의 문제는 binaryFiles실제로 파일을 하나의 단일 파티션으로 읽는 것입니다. 또한 그것은 나에게 매우 신비 스럽지만 pyspark 2.4의 다음 코드 줄은 의미가없는 것과 동일한 동작을 생성합니다.

# this should work but does not
sc.binaryFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))
# this does not work either, which is strange but it would not be advised anyway
# since all the data would be read on one partition
sc.binaryFiles("dir").repartition(4).foreach(lambda x: some_function(x, ''))

그러나 binaryFiles실제로 파일 wholeTextFile을 읽으므로 파일을 텍스트 파일로 읽고 예상대로 작동하는 것을 사용할 수 있습니다 .

# this works
sc.wholeTextFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

pyspark를 사용하여 Spark에서 행 일괄 처리를 얻는 방법

분류에서Dev

Spark를 사용하여 정렬 된 순서로 데이터를 CSV 파일에 병합

분류에서Dev

VBScript를 사용하여 바이너리 파일에서 읽는 방법

분류에서Dev

C에서 fork ()를 사용하여 폴더에있는 각 텍스트 파일의 행을 병렬로 계산

분류에서Dev

sbt-native-packager를 사용하여 Docker에서 바이너리 파일을 실행 가능하게 만드는 방법은 무엇입니까?

분류에서Dev

병렬 처리를 사용하여 파일에서 청크를 읽고 순서대로 문자열을 결합하는 방법이 있습니까?

분류에서Dev

여러 입력 파일을 사용하고 Python에서 병렬 처리를 수행하는 방법은 무엇입니까?

분류에서Dev

Spark에서 같지 않은 절을 사용하여 하위 쿼리를 실행하는 방법은 무엇입니까?

분류에서Dev

awk를 사용하여 두 파일을 병렬로 반복하고 일치 및 일치하지 않는 인쇄

분류에서Dev

MySQL : HEX를 사용하여 중간 Blob 필드 바이너리에 파일을 저장하는 방법

분류에서Dev

Java에서 바이너리 파일을 병합하는 가장 좋은 방법

분류에서Dev

PySpark를 사용하여 병렬로 독립적 인 변환을 실행하는 방법은 무엇입니까?

분류에서Dev

Java ExecutorService를 사용하여 스레드가 병렬로 실행되지 않는 이유는 무엇입니까?

분류에서Dev

Matlab에서 인접 행렬을 사용하여 2 개의 병합 트리를 그리는 방법

분류에서Dev

로컬 테스트를 위해 Spark 스트리밍을 사용하여 로컬 설정에서 두 개의 병렬 사용자 지정 수신기 스트림 실행

분류에서Dev

mapPartitions를 사용하지 않는 SPARK N-gram 및 병렬화

분류에서Dev

OLEDB (VFPOLEDB)를 사용하여 DBF 파일에서 쿼리를 실행하는 것이 너무 느립니다.

분류에서Dev

오이 피처 파일을 병렬로 실행하는 방법

분류에서Dev

Perl을 사용하여 바이너리 파일에서 마지막 16 바이트를 얻는 방법은 무엇입니까?

분류에서Dev

BASH에서 GNU 병렬을 사용하여 curl에 인수를 올바르게 전달하는 방법

분류에서Dev

바이너리 파일에서 오프셋을 사용하여 입력에서 바이트를 쓰는 방법은 무엇입니까?

분류에서Dev

하나의 C ++ 바이너리를 여러 번 병렬로 실행하는 유닉스에 문제가 있습니까?

분류에서Dev

fread ()를 사용하여 바이너리 파일에서 읽기는 추가 문자를 표시합니다.

분류에서Dev

Java에서 Spliterator를 사용하여 병렬 처리 성능을 테스트하는 방법

분류에서Dev

로컬 시스템에서 "병렬"로 rsync를 실행하여 파일을 병렬로 가져 오는 방법은 무엇입니까?

분류에서Dev

파이썬에서 numpy를 사용하여 for 루프를 사용하지 않고 행렬 매핑을 달성하는 방법은 무엇입니까?

분류에서Dev

pyspark를 사용하여 로컬 시스템에서 파일을 읽는 동안 파일 이름 가져 오기

분류에서Dev

Fixture를 실행하지만 병렬 NUnit에서 메소드는 실행하지 않음

분류에서Dev

esmtp, C 코드를 사용하여 바이너리 파일을 첨부 파일로 보낼 때 '\ r'(CR)이 '\ n'(LF) 앞에 추가되는 이유

Related 관련 기사

  1. 1

    pyspark를 사용하여 Spark에서 행 일괄 처리를 얻는 방법

  2. 2

    Spark를 사용하여 정렬 된 순서로 데이터를 CSV 파일에 병합

  3. 3

    VBScript를 사용하여 바이너리 파일에서 읽는 방법

  4. 4

    C에서 fork ()를 사용하여 폴더에있는 각 텍스트 파일의 행을 병렬로 계산

  5. 5

    sbt-native-packager를 사용하여 Docker에서 바이너리 파일을 실행 가능하게 만드는 방법은 무엇입니까?

  6. 6

    병렬 처리를 사용하여 파일에서 청크를 읽고 순서대로 문자열을 결합하는 방법이 있습니까?

  7. 7

    여러 입력 파일을 사용하고 Python에서 병렬 처리를 수행하는 방법은 무엇입니까?

  8. 8

    Spark에서 같지 않은 절을 사용하여 하위 쿼리를 실행하는 방법은 무엇입니까?

  9. 9

    awk를 사용하여 두 파일을 병렬로 반복하고 일치 및 일치하지 않는 인쇄

  10. 10

    MySQL : HEX를 사용하여 중간 Blob 필드 바이너리에 파일을 저장하는 방법

  11. 11

    Java에서 바이너리 파일을 병합하는 가장 좋은 방법

  12. 12

    PySpark를 사용하여 병렬로 독립적 인 변환을 실행하는 방법은 무엇입니까?

  13. 13

    Java ExecutorService를 사용하여 스레드가 병렬로 실행되지 않는 이유는 무엇입니까?

  14. 14

    Matlab에서 인접 행렬을 사용하여 2 개의 병합 트리를 그리는 방법

  15. 15

    로컬 테스트를 위해 Spark 스트리밍을 사용하여 로컬 설정에서 두 개의 병렬 사용자 지정 수신기 스트림 실행

  16. 16

    mapPartitions를 사용하지 않는 SPARK N-gram 및 병렬화

  17. 17

    OLEDB (VFPOLEDB)를 사용하여 DBF 파일에서 쿼리를 실행하는 것이 너무 느립니다.

  18. 18

    오이 피처 파일을 병렬로 실행하는 방법

  19. 19

    Perl을 사용하여 바이너리 파일에서 마지막 16 바이트를 얻는 방법은 무엇입니까?

  20. 20

    BASH에서 GNU 병렬을 사용하여 curl에 인수를 올바르게 전달하는 방법

  21. 21

    바이너리 파일에서 오프셋을 사용하여 입력에서 바이트를 쓰는 방법은 무엇입니까?

  22. 22

    하나의 C ++ 바이너리를 여러 번 병렬로 실행하는 유닉스에 문제가 있습니까?

  23. 23

    fread ()를 사용하여 바이너리 파일에서 읽기는 추가 문자를 표시합니다.

  24. 24

    Java에서 Spliterator를 사용하여 병렬 처리 성능을 테스트하는 방법

  25. 25

    로컬 시스템에서 "병렬"로 rsync를 실행하여 파일을 병렬로 가져 오는 방법은 무엇입니까?

  26. 26

    파이썬에서 numpy를 사용하여 for 루프를 사용하지 않고 행렬 매핑을 달성하는 방법은 무엇입니까?

  27. 27

    pyspark를 사용하여 로컬 시스템에서 파일을 읽는 동안 파일 이름 가져 오기

  28. 28

    Fixture를 실행하지만 병렬 NUnit에서 메소드는 실행하지 않음

  29. 29

    esmtp, C 코드를 사용하여 바이너리 파일을 첨부 파일로 보낼 때 '\ r'(CR)이 '\ n'(LF) 앞에 추가되는 이유

뜨겁다태그

보관