这就是我想要做的:
A = LOAD '...' USING PigStorage(',') AS (
col1:int
,col2:chararray
);
B = ORDER A by col2;
C = CUSTOM_UDF(A);
CUSTOM_UDF遍历元组,这些元组必须是有序的。UDF将为每几个输入元组输出一个聚合元组。即,我不是以1:1的方式返回元组。
本质上:
public class CustomUdf extends EvalFunc<Tuple> {
public Tuple exec(Tuple input) throws IOException {
Aggregate aggregatedOutput = null;
DataBag values = (DataBag)input.get(0);
for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
Tuple tuple = iterator.next();
....
if (some condition regarding current input tuple){
//do something to aggregatedOutput with information from input tuple
} else {
//Because input tuple does not apply to current aggregateOutput
//return current aggregateOutput and apply input tuple
//to new aggregateOutput
Tuple returnTuple = aggregatedOutput.getTuple();
aggregatedOutputTuple = new Aggregate(tuple);
return returnTuple;
}
}
}
// Establish the output Schema as a tuple
public Schema outputSchema(Schema input) {
Schema tupleSchema = new Schema();
...
return new Schema(
new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
tupleSchema,
DataType.TUPLE));
}
/** This inner class is simply a wrapper for the output tuple **/
class Aggregate {
//member variables
public Aggregate(Tuple input) {
//set member variables to value of input's fields
}
public Tuple getTuple() {
Tuple output = TupleFactory.getInstance().newTuple(5);
//set tuple's fields to values of member variables
return output;
}
}
}
我已经能够做类似的事情
A = LOAD '...' USING PigStorage(',') AS (
col1:int
,col2:chararray
);
B = ORDER A by col2;
C = GROUP B BY col1;
D = FOREACH C {
GENERATE CUSTOM_UDF(B);
}
但是,这似乎并没有保留ORDER BY,而且d
由于不断收到无效的字段投影,我无法弄清楚如何排序。
另外,我不需要分组(它恰好在此用例中起作用),并且只想将B别名作为一堆元组发送给CUSTOM_UDF。
我该如何实现?
我认为您对CustomUdf的编写方式有疑问。根据您的描述,听起来应该是EvalFunc <DataBag>,而不是EvalFunc <Tuple>。然后在实现中,当您遍历输入包中的所有元组时,会将累积的元组附加到在方法末尾返回的DataBag。
您的Pig代码将如下所示。我不认为ORDER BY会像您一样保留跨单独语句的顺序。但是,它将在下面的嵌套FOREACH中保留顺序。
A = LOAD '...' USING PigStorage(',') AS (
col1:int
,col2:chararray
);
B = FOREACH (GROUP A ALL) {
A_ordered = ORDER A BY col2;
GENERATE FLATTEN(CUSTOM_UDF(A_ordered));
}
exec方法看起来像下面的修改版本。注意我所做的更改。
public DataBag exec(Tuple input) throws IOException { // different return type
Aggregate aggregatedOutput = null;
DataBag result = BagFactory.newDefaultBag(); // change here
DataBag values = (DataBag)input.get(0);
for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
Tuple tuple = iterator.next();
....
if (some condition regarding current input tuple){
//do something to aggregatedOutput with information from input tuple
} else {
//Because input tuple does not apply to current aggregateOutput
//return current aggregateOutput and apply input tuple
//to new aggregateOutput
Tuple returnTuple = aggregatedOutput.getTuple();
aggregatedOutputTuple = new Aggregate(tuple);
result.add(returnTuple); // change here
}
}
return result; // change here
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句