我只是建立了Datatorrent RTS(Apache Apex)平台并运行pi演示。我想使用来自kafka的“ avro”消息,然后聚合并将数据存储到hdfs中。我可以为此或kafka获得示例代码吗?
这是使用新的Kafka输入运算符和Apex Malhar的文件输出运算符的完整工作应用程序的代码。它将字节数组转换为字符串,并使用具有限制大小的滚动文件(在此示例中为1K)将它们写到HDFS中。直到文件大小达到界限为止,它将使用带有.tmp
扩展名的临时名称。您可以按照DevT在https://stackoverflow.com/a/36666388中的建议在这两者之间插入其他运算符:
package com.example.myapexapp;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
@ApplicationAnnotation(name="MyFirstApplication")
public class KafkaApp implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
KafkaSinglePortInputOperator in = dag.addOperator("in", new KafkaSinglePortInputOperator());
in.setInitialPartitionCount(1);
in.setTopics("test");
in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
//in.setClusters("localhost:2181");
in.setClusters("localhost:9092"); // NOTE: need broker address, not zookeeper
LineOutputOperator out = dag.addOperator("out", new LineOutputOperator());
out.setFilePath("/tmp/FromKafka");
out.setFileName("test");
out.setMaxLength(1024); // max size of rolling output file
// create stream connecting input adapter to output adapter
dag.addStream("data", in.outputPort, out.input);
}
}
/**
* Converts each tuple to a string and writes it as a new line to the output file
*/
class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
private static final String NL = System.lineSeparator();
private static final Charset CS = StandardCharsets.UTF_8;
private String fileName;
@Override
public byte[] getBytesForTuple(byte[] t) { return (new String(t, CS) + NL).getBytes(CS); }
@Override
protected String getFileName(byte[] tuple) { return fileName; }
public String getFileName() { return fileName; }
public void setFileName(final String v) { fileName = v; }
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句