我想在hadoop 2.2.0程序中解析PDF文件,我发现了this,遵循了它的意思,直到现在,我拥有这三个类:
PDFWordCount
:包含map和reduce函数的主类。(就像本地hadoop wordcount示例一样,但TextInputFormat
我没有使用我的PDFInputFormat
课程。PDFRecordReader extends RecordReader<LongWritable, Text>
:这是这里的主要工作。尤其是我将initialize
功能放在此处以进行更多说明。
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
System.out.println("initialize");
System.out.println(genericSplit.toString());
FileSplit split = (FileSplit) genericSplit;
System.out.println("filesplit convertion has been done");
final Path file = split.getPath();
Configuration conf = context.getConfiguration();
conf.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
FileSystem fs = file.getFileSystem(conf);
System.out.println("fs has been opened");
start = split.getStart();
end = start + split.getLength();
System.out.println("going to open split");
FSDataInputStream filein = fs.open(split.getPath());
System.out.println("going to load pdf");
PDDocument pd = PDDocument.load(filein);
System.out.println("pdf has been loaded");
PDFTextStripper stripper = new PDFTextStripper();
in =
new LineReader(new ByteArrayInputStream(stripper.getText(pd).getBytes(
"UTF-8")));
start = 0;
this.pos = start;
System.out.println("init has finished");
}
(您可以看到我system.out.println
的进行调试。此方法无法转换genericSplit
为FileSplit
。我在控制台中看到的最后一件事是:
hdfs://localhost:9000/in:0+9396432
这是 genericSplit.toString()
PDFInputFormat extends FileInputFormat<LongWritable, Text>
:仅new PDFRecordReader
在createRecordReader
方法中创建。
我想知道我的错误是什么?
我需要额外的课程吗?
读取PDF并不那么困难,您需要扩展FileInputFormat类和RecordReader。由于FileInputClass是二进制文件,因此它们不能拆分PDF文件。
public class PDFInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new PDFLineRecordReader();
}
// Do not allow to ever split PDF files, even if larger than HDFS block size
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
然后,RecordReader自己执行读取(我正在使用PDFBox读取PDF)。
public class PDFLineRecordReader extends RecordReader<Text, Text> {
private Text key = new Text();
private Text value = new Text();
private int currentLine = 0;
private List<String> lines = null;
private PDDocument doc = null;
private PDFTextStripper textStripper = null;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) split;
final Path file = fileSplit.getPath();
Configuration conf = context.getConfiguration();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream filein = fs.open(fileSplit.getPath());
if (filein != null) {
doc = PDDocument.load(filein);
// Konnte das PDF gelesen werden?
if (doc != null) {
textStripper = new PDFTextStripper();
String text = textStripper.getText(doc);
lines = Arrays.asList(text.split(System.lineSeparator()));
currentLine = 0;
}
}
}
// False ends the reading process
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key = new Text();
}
if (value == null) {
value = new Text();
}
if (currentLine < lines.size()) {
String line = lines.get(currentLine);
key.set(line);
value.set("");
currentLine++;
return true;
} else {
// All lines are read? -> end
key = null;
value = null;
return false;
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (100.0f / lines.size() * currentLine) / 100.0f;
}
@Override
public void close() throws IOException {
// If done close the doc
if (doc != null) {
doc.close();
}
}
希望这可以帮助!
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句