Pig:如何将所有元组发送到UDF而不进行分组处理?或者如何在不分组的情况下将元组转换为Bag?

马修·莫森(Matthew Moisen)

这就是我想要做的:

A = LOAD '...' USING PigStorage(',') AS (
    col1:int
    ,col2:chararray
);
B = ORDER A by col2;
C = CUSTOM_UDF(A);

CUSTOM_UDF遍历元组,这些元组必须是有序的。UDF将为每几个输入元组输出一个聚合元组。即,我不是以1:1的方式返回元组。

本质上:

public class CustomUdf extends EvalFunc<Tuple> {
    public Tuple exec(Tuple input) throws IOException {
        Aggregate aggregatedOutput = null;

        DataBag values = (DataBag)input.get(0);
        for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
            Tuple tuple = iterator.next();
            ....
            if (some condition regarding current input tuple){
                //do something to aggregatedOutput with information from input tuple
            } else {
                //Because input tuple does not apply to current aggregateOutput
                //return current aggregateOutput and apply input tuple
                //to new aggregateOutput
                Tuple returnTuple = aggregatedOutput.getTuple();
                aggregatedOutputTuple = new Aggregate(tuple);
                return returnTuple;
            }
        }
    }
    // Establish the output Schema as a tuple
    public Schema outputSchema(Schema input) {
        Schema tupleSchema = new Schema();
        ...
        return new Schema(
            new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), 
            tupleSchema, 
            DataType.TUPLE));
    }

    /** This inner class is simply a wrapper for the output tuple **/
    class Aggregate {
        //member variables

        public Aggregate(Tuple input) {
            //set member variables to value of input's fields
        }
        public Tuple getTuple() {
            Tuple output = TupleFactory.getInstance().newTuple(5);
            //set tuple's fields to values of member variables
            return output;
        }
    }
}

我已经能够做类似的事情

A = LOAD '...' USING PigStorage(',') AS (
    col1:int
    ,col2:chararray
);
B = ORDER A by col2;
C = GROUP B BY col1;
D = FOREACH C {
    GENERATE CUSTOM_UDF(B);
}

但是,这似乎并没有保留ORDER BY,而且d由于不断收到无效的字段投影,我无法弄清楚如何排序

另外,我不需要分组(它恰好在此用例中起作用),并且只想将B别名作为一堆元组发送给CUSTOM_UDF。

我该如何实现?

物质问题

我认为您对CustomUdf的编写方式有疑问。根据您的描述,听起来应该是EvalFunc <DataBag>,而不是EvalFunc <Tuple>。然后在实现中,当您遍历输入包中的所有元组时,会将累积的元组附加到在方法末尾返回的DataBag。

您的Pig代码将如下所示。我不认为ORDER BY会像您一样保留跨单独语句的顺序。但是,它将在下面的嵌套FOREACH中保留顺序。

A = LOAD '...' USING PigStorage(',') AS (
    col1:int
    ,col2:chararray
);
B = FOREACH (GROUP A ALL) {
   A_ordered = ORDER A BY col2;
   GENERATE FLATTEN(CUSTOM_UDF(A_ordered));
}

exec方法看起来像下面的修改版本。注意我所做的更改。

public DataBag exec(Tuple input) throws IOException { // different return type
    Aggregate aggregatedOutput = null;

    DataBag result = BagFactory.newDefaultBag(); // change here
    DataBag values = (DataBag)input.get(0);
    for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
        Tuple tuple = iterator.next();
        ....
        if (some condition regarding current input tuple){
            //do something to aggregatedOutput with information from input tuple
        } else {
            //Because input tuple does not apply to current aggregateOutput
            //return current aggregateOutput and apply input tuple
            //to new aggregateOutput
            Tuple returnTuple = aggregatedOutput.getTuple();
            aggregatedOutputTuple = new Aggregate(tuple);
            result.add(returnTuple);  // change here
        }
    }
    return result; // change here
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

Related 相关文章

热门标签

归档