Spark 1.2.1 (in local
모드)을 사용하여 파일에서 로그 정보를 추출하고 처리합니다.
파일 크기는 100Mb 이상일 수 있습니다. 파일에는 매우 긴 한 줄이 포함되어 있으므로 정규식을 사용하여이 파일을 로그 데이터 행으로 분할합니다.
MyApp.java
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> txtFileRdd = sc.textFile(filename);
JavaRDD<MyLog> logRDD = txtFileRdd.flatMap(LogParser::parseFromLogLine).cache();
LogParser.java
public static Iterable<MyLog> parseFromLogLine(String logline) {
List<MyLog> logs = new LinkedList<MyLog>();
Matcher m = PATTERN.matcher(logline);
while (m.find()) {
logs.add(new MyLog(m.group(0)));
}
System.out.println("Logs detected " + logs.size());
return logs;
}
처리 된 파일의 실제 크기는 약 100MB이며 실제로 323863
로그 항목을 포함 합니다.
Spark를 사용하여 파일에서 로그 항목을 추출하면 올바르지 않은 455651
[ logRDD.count()
] 로그 항목이 표시됩니다.
파일 파티션으로 인해 발생한다고 생각하며 출력을 확인하면 다음과 같습니다.
Logs detected 18694
Logs detected 113104
Logs detected 323863
그리고 총합은 455651
!
따라서 내 파티션이 서로 병합되어 중복 항목을 유지하는 것을 확인하고 이러한 동작을 방지하고 싶습니다.
해결 방법은 repartition(1)
다음과 같습니다.
txtFileRdd.repartition(1).flatMap(LogParser::parseFromLogLine).cache();
그것은 나에게 원하는 결과 323863
를 제공하지만 성능에 좋은지 의심합니다.
처리 성능을 향상시키는 방법은 무엇입니까?
파티셔닝은 기본적으로 라인 기반입니다. 이것은 하나의 매우 긴 줄이있을 때 흥미로운 방식으로 실패하는 것 같습니다. 이에 대한 버그를 제출하는 것을 고려할 수 있습니다 (이미있을 수 있음).
분할은 Hadoop 파일 API, 특히 TextInputFormat
클래스에 의해 수행됩니다 . 한 가지 옵션은 자체 InputFormat
(전체 파서를 포함 할 수 있음 )를 지정 하고 sc.hadoopFile
.
또 다른 옵션은 다음을 통해 다른 구분 기호를 설정하는 것입니다 textinputformat.record.delimiter
.
// Use space instead of newline as the delimiter.
sc.hadoopConfiguration.set("textinputformat.record.delimiter", " ")
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다