저는 Spark의 초보자이며 Python으로 스크립트를 작성하기 시작했습니다. 내 이해는 Spark가 변환을 병렬 (맵)로 실행한다는 것입니다.
def some_function(name, content):
print(name, datetime.now())
time.sleep(30)
return content
config = SparkConf().setAppName("sample2").setMaster("local[*]")
filesRDD = SparkContext(conf=config).binaryFiles("F:\\usr\\temp\\*.zip")
inputfileRDD = filesRDD.map(lambda job_bundle: (job_bundle[0], some_function(job_bundle[0], job_bundle[1])))
print(inputfileRDD.collect())
위의 코드는 .zip
폴더에서 파일 목록을 수집 하여 처리합니다. 내가 그것을 실행할 때 나는 이것이 순차적으로 일어나는 것을보고 있습니다.
산출
file:/F:/usr/temp/sample.zip 2020-10-22 10:42:37.089085
file:/F:/usr/temp/sample1.zip 2020-10-22 10:43:07.103317
30 초 후에 두 번째 파일을 처리하기 시작한 것을 볼 수 있습니다. 첫 번째 파일을 완성한 후 의미합니다. 내 코드에서 무엇이 잘못 되었습니까? RDD를 병렬로 실행하지 않는 이유는 무엇입니까? 제발 날 좀 도와 줄 수 있니 ?
방법이 binaryFiles
스파크 파티션을 가로 질러 파일을 분할하는 방법을 정확히 모르겠습니다 . 반대로 textFiles
, 하나의 파티션 만 만드는 경향 이있는 것 같습니다 . dir
5 개의 파일이 포함 된 라는 예제 디렉토리를 사용하여 살펴 보겠습니다 .
> ls dir
test1 test2 test3 test4 test5
을 사용 textFile
하면 일이 병렬로 실행됩니다. 너무 예쁘지 않기 때문에 출력을 제공하지 않지만 직접 확인할 수 있습니다. .NET과 병렬로 실행되는지 확인할 수 있습니다 getNumPartitions
.
>>> sc.textFile("dir").foreach(lambda x: some_function(x, None))
# ugly output, but everything starts at the same time,
# except maybe the last one since you have 4 cores.
>>> sc.textFile("dir").getNumPartitions()
5
함께 binaryFiles
가지 다른 어떤 이유로 모든 것이 동일한 파티션에 간다.
>>> sc.binaryFiles("dir").getNumPartitions()
1
나는 심지어 10k 파일로 시도했지만 모든 것이 여전히 동일한 파티션으로 이동합니다. 그 이유는 scala에서 binaryFiles
파일 이름과 파일을 읽을 수있는 객체 (그러나 읽기가 수행되지 않음)가있는 RDD를 반환하기 때문이라고 생각합니다 . 따라서 속도가 빠르고 결과 RDD가 작습니다. 따라서 하나의 파티션에있는 것이 좋습니다. 따라서 스칼라에서는 사용 후 다시 파티션을 사용할 수 binaryFiles
있으며 모든 것이 잘 작동합니다.
scala> sc.binaryFiles("dir").getNumPartitions
1
scala> sc.binaryFiles("dir").repartition(4).getNumPartitions
4
scala> sc.binaryFiles("dir").repartition(4)
.foreach{ case (name, ds) => {
println(System.currentTimeMillis+": "+name)
Thread.sleep(2000)
// do some reading on the DataStream ds
}}
1603352918396: file:/home/oanicol/sandbox/dir/test1
1603352918396: file:/home/oanicol/sandbox/dir/test3
1603352918396: file:/home/oanicol/sandbox/dir/test4
1603352918396: file:/home/oanicol/sandbox/dir/test5
1603352920397: file:/home/oanicol/sandbox/dir/test2
파이썬의 문제는 binaryFiles
실제로 파일을 하나의 단일 파티션으로 읽는 것입니다. 또한 그것은 나에게 매우 신비 스럽지만 pyspark 2.4의 다음 코드 줄은 의미가없는 것과 동일한 동작을 생성합니다.
# this should work but does not
sc.binaryFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))
# this does not work either, which is strange but it would not be advised anyway
# since all the data would be read on one partition
sc.binaryFiles("dir").repartition(4).foreach(lambda x: some_function(x, ''))
그러나 binaryFiles
실제로 파일 wholeTextFile
을 읽으므로 파일을 텍스트 파일로 읽고 예상대로 작동하는 것을 사용할 수 있습니다 .
# this works
sc.wholeTextFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다