다음과 같은 매우 간단한 Spark 작업이 있습니다.
JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
JavaPairRDD<Key,Value> indexSrc =
rawData.filter(new IndexFilter()).cache();
JavaPairRDD<Key,Value> indexEntries =
indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
JavaPairRDD<Key,Value> reverseIndexEntries =
indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
JavaPairRDD<Key,Value> dataEntries =
rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();
dataEntries.union(indexEntries)
.union(reverseIndexEntries)
.repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
.saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, conf);
여기서 Key와 Value는 Apache Accumulo의 Key 및 Value 클래스 (KryoSerializer 사용)입니다.
cache ()에 대한 호출을 정확히 어디에 넣을지 또는 전혀 필요한지 모르겠습니다. 그러나 내 실행자가 내가 할당 한 메모리를 많이 사용하지 않는 것 같습니다.
그리고 응용 프로그램 UI의 "저장소"페이지가 비어 있습니다.
내가 뭔가 잘못했거나 Spark가 내 RDD를 저장하여이 작업을 더 빨리 진행할 수 없다고 결정 했습니까?
사용 된 메모리는 캐싱에 사용되는 메모리를 의미합니다.
코드에서 하나의 작업 만 수행 하고 indexSrc 또는 dataEntries는 다시 사용되지 않으므로 캐싱 할 지점이 없습니다.
그것을 증명하기 위해 추가 할 수 있습니다.
indexSrc.count();
그리고 dataEntries.count();
다음을 선언하고 후 집행자 / 저장 페이지를 확인.
JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
JavaPairRDD<Key,Value> indexSrc = rawData.filter(new IndexFilter()).cache();
indexSrc.count();
JavaPairRDD<Key,Value> indexEntries = indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
JavaPairRDD<Key,Value> reverseIndexEntries = indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
JavaPairRDD<Key,Value> dataEntries = rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();
dataEntries.count();
dataEntries.union(indexEntries)
.union(reverseIndexEntries)
.repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
.saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, conf);
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다