다음 코드로 가져온 2 개의 RDD가 있습니다.
val fileA = sc.textFile("fileA.txt")
val fileB = sc.textFile("fileB.txt")
그런 다음 키로 매핑하고 축소합니다.
val countsB = fileB.flatMap(line => line.split("\n"))
.map(word => (word, 1))
.reduceByKey(_+_)
val countsA = fileA.flatMap(line => line.split("\n"))
.map(word => (word, 1))
.reduceByKey(_+_)
이제 키가 countA에 존재하는 경우 countB의 모든 키를 찾아 제거하고 싶지 않습니다.
나는 다음과 같은 것을 시도했다.
countsB.keys.foreach(b => {
if(countsB.collect().exists(_ == b)){
countsB.collect().drop(countsB.collect().indexOf(b))
}
})
그러나 그것은 열쇠로 그들을 제거하는 것처럼 보이지 않습니다.
제안 된 코드에는 3 가지 문제가 있습니다.
당신은하는 collect
데이터 집합이 큰 경우에 당신을 잃게 스파크의 병렬 처리 및 위험에서 OutOfMemory 오류 때문에, 그들은 더 이상, 그들은 일반 스칼라 컬렉션으로 드라이버 응용 프로그램의 메모리에 복사됩니다 RDDs하지 않은 수단 RDDs을 보내고
drop
변경 불가능한 Scala 컬렉션 (또는 RDD
)을 호출 할 때 원본 컬렉션을 변경하지 않고 해당 레코드가 삭제 된 새 컬렉션 을 가져 오므로 원본 컬렉션이 변경 될 것으로 기대할 수 없습니다.
RDD
RDD의 상위 메서드 (예 : foreach
이 경우)에 전달 된 함수 내에서 액세스 할 수 없습니다. 이 메서드에 전달 된 모든 함수는 직렬화되어 작업자에게 전송되며 RDD
s는 (의도적으로) 직렬화 할 수 없습니다. 드라이버 메모리로 가져 와서 직렬화 한 다음 작업자에게 다시 보내기 위해 데이터가 이미 작업자에 배포되어 있습니다!
이 모든 문제를 해결하려면 한 RDD의 데이터를 사용하여 다른 RDD를 변환 / 필터링하려는 경우 일반적으로 join
. 이 경우 다음을 수행 할 수 있습니다.
// left join, and keep only records for which there was NO match in countsA:
countsB.leftOuterJoin(countsA).collect { case (key, (valueB, None)) => (key, valueB) }
참고이 collect
내가 여기에 사용하고하는 것은 아니라는 것을 collect
당신이 사용 -이 일이 소요 PartialFunction
인수로, 그리고 동작합니다의 조합과 같이 map
하고 filter
, 그리고 가장 중요한 :이 드라이버 메모리에 모든 데이터를 복사하지 않습니다.
편집 : Archetypal Paul이 언급했듯이-훨씬 더 짧고 더 좋은 옵션이 있습니다- subtractByKey
:
countsB.subtractByKey(countsA)
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다