我可以获取使用avro kafka消息的示例代码吗?

维克

我只是建立了Datatorrent RTS(Apache Apex)平台并运行pi演示。我想使用来自kafka的“ avro”消息,然后聚合并将数据存储到hdfs中。我可以为此或kafka获得示例代码吗?

排列

这是使用新的Kafka输入运算符和Apex Malhar的文件输出运算符的完整工作应用程序的代码。它将字节数组转换为字符串,并使用具有限制大小的滚动文件(在此示例中为1K)将它们写到HDFS中。直到文件大小达到界限为止,它将使用带有.tmp扩展名的临时名称您可以按照DevT在https://stackoverflow.com/a/36666388中的建议在这两者之间插入其他运算符

package com.example.myapexapp;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.hadoop.conf.Configuration;

import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;

import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;

@ApplicationAnnotation(name="MyFirstApplication")
public class KafkaApp implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    KafkaSinglePortInputOperator in = dag.addOperator("in", new KafkaSinglePortInputOperator());
    in.setInitialPartitionCount(1);
    in.setTopics("test");
    in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
    //in.setClusters("localhost:2181");
    in.setClusters("localhost:9092");   // NOTE: need broker address, not zookeeper

    LineOutputOperator out = dag.addOperator("out", new LineOutputOperator());
    out.setFilePath("/tmp/FromKafka");
    out.setFileName("test");
    out.setMaxLength(1024);        // max size of rolling output file

    // create stream connecting input adapter to output adapter
    dag.addStream("data", in.outputPort, out.input);
  }
}

/**
 * Converts each tuple to a string and writes it as a new line to the output file
 */
class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
  private static final String NL = System.lineSeparator();
  private static final Charset CS = StandardCharsets.UTF_8;
  private String fileName;

  @Override
  public byte[] getBytesForTuple(byte[] t) { return (new String(t, CS) + NL).getBytes(CS); }

  @Override
  protected String getFileName(byte[] tuple) { return fileName; }

  public String getFileName() { return fileName; }
  public void setFileName(final String v) { fileName = v; }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

我可以使用array.filter()从数组中获取示例元素吗?

来自分类Dev

我可以获取我的数据库代码吗?

来自分类Dev

我可以通过使用其网址获取Blob吗?

来自分类Dev

我可以在联接中使用委托变量并获取IQueryable吗

来自分类Dev

我可以使用输出变量获取值吗?

来自分类Dev

我可以使用php从localstorage获取数据吗

来自分类Dev

我可以获取成功消息..!

来自分类Dev

我可以使用相同的代码多次获取Instagram的access_token吗?

来自分类Dev

我可以使用相同的代码多次获取Instagram的access_token吗?

来自分类Dev

通过Live SDK获取“过期的令牌”。示例代码正确吗?

来自分类Dev

使用LINQ可以改善我的代码吗?

来自分类Dev

我可以从提交操作中获取进度消息吗?

来自分类Dev

RabbitMQ:我可以获取最新的n条消息吗?

来自分类Dev

Kafka使用者在kafka.apache.org上运行示例时未收到消息

来自分类Dev

kafka将消息放回队列中,可以吗?

来自分类Dev

我可以检索Kafka分区的最新可用偏移量而不检索所有消息吗?

来自分类Dev

我可以在控制器MVC上获取与数据注释一起使用的自定义错误消息吗?

来自分类Dev

我应该使用 <code> 或只读 <textarea> 标记代码示例以供屏幕阅读器使用吗?

来自分类Dev

GET请求消息可以返回HTTP状态代码201吗?

来自分类Dev

我正在为我的Web服务寻找SOAP消息传递的一些示例代码。

来自分类Dev

我可以在MediaWiki Wiki上使用jquery.uls从英文名称中获取语言代码吗?

来自分类Dev

我可以使用Invoke-WebRequest将二进制数据写入文件并仍然获取状态代码吗?

来自分类Dev

使用XMPP,我可以发送没有订阅的消息吗?

来自分类Dev

使用XMPP,我可以发送没有订阅的消息吗?

来自分类Dev

当我在代码中依靠异常消息时,可以确定它是英文的吗?

来自分类Dev

我可以提供多个示例响应吗?

来自分类Dev

我们可以使用“消息”作为(自定义)事件在Google跟踪代码管理器中触发代码吗?

来自分类Dev

我可以使用FileSystemObject使用其索引从文件夹中获取单个文件吗?

来自分类Dev

您能给我消息来源在哪里找到Clojure开发堆栈的示例吗?

Related 相关文章

  1. 1

    我可以使用array.filter()从数组中获取示例元素吗?

  2. 2

    我可以获取我的数据库代码吗?

  3. 3

    我可以通过使用其网址获取Blob吗?

  4. 4

    我可以在联接中使用委托变量并获取IQueryable吗

  5. 5

    我可以使用输出变量获取值吗?

  6. 6

    我可以使用php从localstorage获取数据吗

  7. 7

    我可以获取成功消息..!

  8. 8

    我可以使用相同的代码多次获取Instagram的access_token吗?

  9. 9

    我可以使用相同的代码多次获取Instagram的access_token吗?

  10. 10

    通过Live SDK获取“过期的令牌”。示例代码正确吗?

  11. 11

    使用LINQ可以改善我的代码吗?

  12. 12

    我可以从提交操作中获取进度消息吗?

  13. 13

    RabbitMQ:我可以获取最新的n条消息吗?

  14. 14

    Kafka使用者在kafka.apache.org上运行示例时未收到消息

  15. 15

    kafka将消息放回队列中,可以吗?

  16. 16

    我可以检索Kafka分区的最新可用偏移量而不检索所有消息吗?

  17. 17

    我可以在控制器MVC上获取与数据注释一起使用的自定义错误消息吗?

  18. 18

    我应该使用 <code> 或只读 <textarea> 标记代码示例以供屏幕阅读器使用吗?

  19. 19

    GET请求消息可以返回HTTP状态代码201吗?

  20. 20

    我正在为我的Web服务寻找SOAP消息传递的一些示例代码。

  21. 21

    我可以在MediaWiki Wiki上使用jquery.uls从英文名称中获取语言代码吗?

  22. 22

    我可以使用Invoke-WebRequest将二进制数据写入文件并仍然获取状态代码吗?

  23. 23

    使用XMPP,我可以发送没有订阅的消息吗?

  24. 24

    使用XMPP,我可以发送没有订阅的消息吗?

  25. 25

    当我在代码中依靠异常消息时,可以确定它是英文的吗?

  26. 26

    我可以提供多个示例响应吗?

  27. 27

    我们可以使用“消息”作为(自定义)事件在Google跟踪代码管理器中触发代码吗?

  28. 28

    我可以使用FileSystemObject使用其索引从文件夹中获取单个文件吗?

  29. 29

    您能给我消息来源在哪里找到Clojure开发堆栈的示例吗?

热门标签

归档