Hadoop,成功完成Map Reduce作业,但没有输出

数学家失败

客观的:

我正在尝试使用Map Reduce合并数据。我在同一文件夹中有多组数据。

方法:

因此,我在程序/流中依次多次运行Map Reduce合并作业。

问题:

我面临的问题不是失败的作业,而是没有输出的成功作业。第一个(有时是两个)迭代始终具有输出(part-r-00000),但随后没有。我使用的样本数据集的大小和体积非常小(1〜2 kb,大约5个文件)

我试过的

每次运行后使线程休眠5秒,但无济于事。我试过一会儿再使用webhdfs进行检查,但仍然没有这样的文件。

你能给我启发一下吗?提前致谢。

图片:

问题

代码:

/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package mergedata;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 *
 * @author abcdefg
 */
public class MergeData extends Configured implements Tool{

/**
 * @param args the command line arguments
 */
public static class ReadMapper
extends Mapper<Object, Text, Text, IntWritable>{
    @Override
    public void map(Object key, Text value, Mapper.Context context
    ) throws IOException, InterruptedException {

        context.write(new Text(value.toString()),  new IntWritable(1));
    }
}

public static class MergeReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
            Reducer.Context context
    ) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

@Override
public int run(String[] args) throws Exception {

    Configuration conf = getConf();

    FileSystem hdfs = FileSystem.get(conf);

    args = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (args.length != 3) {
        System.err.println(args.length);
        System.err.println("Usage: mergedata <input folder> <temporary folder> <output folder>");
        System.exit(1);
    }
//        FileSystem fs = FileSystem.get(conf);
//        ContentSummary cs = fs.getContentSummary(new Path(args[0]));
//        long fileCount = cs.getFileCount();

    Job job = Job.getInstance(conf);

    job.setJarByClass(MergeData.class);
    job.setMapperClass(ReadMapper.class);
    job.setReducerClass(MergeReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
//        String files = ().replaceAll(",", "," + args[0] + "/");
//        FileInputFormat.addInputPaths(job, files);

    int jobComplete = 1;
    FileStatus[] fileStatus = hdfs.listStatus(new Path(args[0]));

    HashMap<String,Pair<String,Long>> map = new HashMap<String,Pair<String,Long>>();

    String tempName;
    String tempKey;
    Path tempPath;
    for (FileStatus fileStatu : fileStatus) {

        tempPath = fileStatu.getPath();
        tempName = tempPath.getName();
        tempKey = tempName.substring(0,12);
        if (map.containsKey(tempKey)) {
            map.put(tempKey,new Pair(map.get(tempKey).getLeft() + "," +
                    tempPath.toString(),
                    map.get(tempKey).getRight() + fileStatu.getLen()));
        } else {
            map.put(tempKey, new Pair(tempPath.toString(),fileStatu.getLen()));
        }
    }

    String[] files = map.keySet().toArray(new String[map.keySet().size()]);
    String[] inputFiles;
//        String[] files = args[1].split(",");
    for (String file : files)
    {
        System.out.println("file = " + file);
//            FileInputFormat.addInputPath(job, new Path(args[0] + "/" + file + "*"));
        System.out.println(args[2] + "/" + file);
        if (hdfs.exists(new Path(args[2] + "/" + file))) {
            System.out.println(file + " exists in " + args[2]);
            map.put(file,new Pair(
                    map.get(file).getLeft() + "," + args[2] + "/" + file,
                    map.get(file).getRight() + hdfs.getFileStatus(new Path(args[2] + "/" + file)).getLen()
            ));
        }
        System.out.println("MR job input files : " + map.get(file).getLeft());
        FileInputFormat.setInputPaths(job, map.get(file).getLeft());

        System.out.println("MR job output dir : " + args[1] + "/" + file);
        FileOutputFormat.setOutputPath(job ,new Path(args[1] + "/" + file));
        if (hdfs.exists(new Path(args[1] + "/" + file))) {
            hdfs.delete(new Path(args[1] + "/" + file), true); // Shouldn't occur
        }
        jobComplete = Math.max(jobComplete, (job.waitForCompletion(true))? 0 : 1);
            // hdfs.getFileStatus(tempFile)
        if (job.isSuccessful()) {
                // Following sequence includes size check before deleting files

            FileStatus[] filesStatuz = hdfs.listStatus(new Path(args[1] + "/" + file + "/part-r-00000"));

            System.out.println("filesStatuz[0].getLen() = " + filesStatuz[0].getLen());
            System.out.println("totalLen = " + map.get(file).getRight());
            if (filesStatuz[0].getLen() >= map.get(file).getRight()) {

                if (hdfs.exists(new Path(args[2] + "/" + file))) {
                    System.out.println("Found the main file of " + file);
                    hdfs.rename(new Path(args[2] + "/" + file), new Path(args[2] + "/" + file + "_tmp"));
                }
                hdfs.rename(new Path(args[1] + "/" + file + "/part-r-00000"), new Path(args[2] + "/" + file));
                hdfs.delete(new Path(args[1] + "/" + file), true);
                System.out.println("Done safe replacement");

//                    hdfs.delete(new Path(args[0] + "/" + file + "*"), false);
                inputFiles = map.get(file).getLeft().split(",");
                for (String inputFile : inputFiles) {
                    if (!inputFile.equals(args[2] + "/" + file)) {
                        hdfs.delete(new Path(inputFile), false);
                        System.out.println(inputFile + " has been deleted");
                    }
                }
                if (hdfs.exists(new Path(args[2] + "/" + file + "_tmp"))) {
                    hdfs.delete(new Path(args[2] + "/" + file + "_tmp"), false);
                    System.out.println("Deleted previous main file of " + file);
                }
            }
            else {
                System.out.println("Merging of " + file +"might have failed. Input and output size doesn't tally");
            }
        }         
    }
    return(jobComplete);
}

public static void main(String[] args) throws Exception {
    // TODO code application logic here
    int exitCode = ToolRunner.run(new MergeData(), args);
    System.exit(exitCode);
}

public class Pair<L,R> {

    private final L left;
    private final R right;

    public Pair(L left, R right) {
        this.left = left;
        this.right = right;
    }
    public L getLeft() { return left; }
    public R getRight() { return right; }

    @Override
    public int hashCode() { return left.hashCode() ^ right.hashCode(); }

    @Override
    public boolean equals(Object o) {
        if (!(o instanceof Pair)) return false;
        Pair pairo = (Pair) o;
        return this.left.equals(pairo.getLeft()) &&
                this.right.equals(pairo.getRight());
    }

}
}

流动:

本质是它将合并日期相似的文件,例如:输入文件夹(args [0])中的cdr_20150701_0,cdr_20150701_1到主文件,例如cdr_20150701,并放置在合并文件夹(args [3])中。但是,如果在合并之前存在这样的主文件,则所有文件(例如:cdr_20150701_0,cdr_20150701_1和cdr_20150701)将合并到新的cdr_20150701。部分0-00000将存储在一个临时文件夹(args [1])中。成功传输后,临时文件夹和部件将被删除。

阿图罗·迪纳雷特(Arturo Dinaret)

您是否尝试使用命令getmerge,也许在您的情况下可能有用。如果仅对数据进行合并,则可能不需要仅针对合并的map reduce作业。

hadoop fs -getmerge [addnl]

将源目录和目标文件作为输入,并将src中的文件串联到目标本地文件中。可以选择将addnl设置为启用,以在每个文件的末尾添加换行符。

http://hadoop.apache.org/docs/r2.7.0/hadoop-project-dist/hadoop-common/FileSystemShell.html

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

运行Hadoop Map-Reduce作业

来自分类Dev

Map Reduce作业生成空的输出文件

来自分类Dev

hadoop map减少作业无输出

来自分类Dev

hadoop map减少作业无输出

来自分类Dev

使用hadoop和Java命令执行map-reduce作业之间有什么区别

来自分类Dev

MongoDB Map Reduce操作输出中的数据是什么意思?有没有可能隐藏它?

来自分类Dev

ftp mget没有显示成功完成

来自分类Dev

Cron作业没有输出

来自分类Dev

如何为Hadoop的Map-reduce作业设置配置?

来自分类Dev

输入与输出:作业成功完成,但是缺少一些输出文件

来自分类Dev

执行完成后,Jenkins作业没有停止

来自分类Dev

AWS Rekognition FaceSearch - 没有作业完成通知

来自分类Dev

Amazon Elastic Map Reduce:作业流程失败,因为尚未生成输出文件

来自分类Dev

Hadoop作业执行(如果群集中没有空间)

来自分类Dev

即使发生IOException,作业仍成功完成

来自分类Dev

Map / Reduce:完成后如何输出Hashmap?

来自分类Dev

如何在hadoop map reduce中写avro输出?

来自分类Dev

等待Hadoop中的几个作业完成

来自分类Dev

Hadoop挂机和作业完成通知

来自分类Dev

使用qsub提交的作业没有输出?

来自分类Dev

放置没有地图界面的自动完成功能

来自分类Dev

EiffelStudio:本地实体没有自动完成功能

来自分类Dev

Hadoop Map Reduce哈希程序

来自分类Dev

Hadoop Map-Reduce编程

来自分类Dev

尝试运行Map Reduce作业时显示“不是有效的Jar”

来自分类Dev

尝试运行Map Reduce作业时显示“不是有效的Jar”

来自分类Dev

在Map Reduce作业Hadoop中将文件中的数据用作哈希映射

来自分类Dev

用于日志分析的 Map Reduce 作业不在 Hadoop 2.7.3 伪分布式模式下运行

来自分类Dev

例外:没有js引擎,无法运行map reduce

Related 相关文章

  1. 1

    运行Hadoop Map-Reduce作业

  2. 2

    Map Reduce作业生成空的输出文件

  3. 3

    hadoop map减少作业无输出

  4. 4

    hadoop map减少作业无输出

  5. 5

    使用hadoop和Java命令执行map-reduce作业之间有什么区别

  6. 6

    MongoDB Map Reduce操作输出中的数据是什么意思?有没有可能隐藏它?

  7. 7

    ftp mget没有显示成功完成

  8. 8

    Cron作业没有输出

  9. 9

    如何为Hadoop的Map-reduce作业设置配置?

  10. 10

    输入与输出:作业成功完成,但是缺少一些输出文件

  11. 11

    执行完成后,Jenkins作业没有停止

  12. 12

    AWS Rekognition FaceSearch - 没有作业完成通知

  13. 13

    Amazon Elastic Map Reduce:作业流程失败,因为尚未生成输出文件

  14. 14

    Hadoop作业执行(如果群集中没有空间)

  15. 15

    即使发生IOException,作业仍成功完成

  16. 16

    Map / Reduce:完成后如何输出Hashmap?

  17. 17

    如何在hadoop map reduce中写avro输出?

  18. 18

    等待Hadoop中的几个作业完成

  19. 19

    Hadoop挂机和作业完成通知

  20. 20

    使用qsub提交的作业没有输出?

  21. 21

    放置没有地图界面的自动完成功能

  22. 22

    EiffelStudio:本地实体没有自动完成功能

  23. 23

    Hadoop Map Reduce哈希程序

  24. 24

    Hadoop Map-Reduce编程

  25. 25

    尝试运行Map Reduce作业时显示“不是有效的Jar”

  26. 26

    尝试运行Map Reduce作业时显示“不是有效的Jar”

  27. 27

    在Map Reduce作业Hadoop中将文件中的数据用作哈希映射

  28. 28

    用于日志分析的 Map Reduce 作业不在 Hadoop 2.7.3 伪分布式模式下运行

  29. 29

    例外:没有js引擎,无法运行map reduce

热门标签

归档