Spring Cloud 数据流将文件流式传输到 HDFS

用户6845507

我是 Spring Cloud Data Flow 的新手。我使用 v 1.7.3 并想创建一个简单的流来扫描新文件的目录并将它们推送到 HDFS。我有以下定义:

file --cron='* * * * * *' --mode=ref --directory=/dir | hdfs --fs-uri=hdfs://myhdpmaster:8020

当我部署我的流时,我有两个问题:

  1. 无论mode我使用哪个文件,都只创建了一个完全没有内容的 hdfs-sink-0.txt 或似乎打印默认 toString() 输出的行(例如'[B@7d5bfc85')。

  2. 当我将新文件放入目录时,HDFS 接收器未收到该消息,尽管我在文件源日志中看到该消息已创建。

我的 hdfs 接收器的输出:

2019-01-25 12:21:06.330  INFO 63 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
2019-01-25 12:21:06.330  INFO 63 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
2019-01-25 12:21:06.338  INFO 63 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@67110f71
2019-01-25 12:21:06.338  INFO 63 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:inbound.testhdfs1.file.testhdfs1} as a subscriber to the 'bridge.testhdfs1.file' channel
2019-01-25 12:21:06.338  INFO 63 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started inbound.testhdfs1.file.testhdfs1
2019-01-25 12:21:06.340  INFO 63 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2019-01-25 12:21:06.476  INFO 63 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 47888 (http)
2019-01-25 12:21:06.483  INFO 63 --- [           main] s.c.s.a.h.s.k.HdfsSinkKafka10Application : Started HdfsSinkKafka10Application in 17.593 seconds (JVM running for 18.756)
2019-01-25 12:21:08.250  INFO 63 --- [           -C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator kafka:9092 (id: 2147482646 rack: null) for group testhdfs1.
2019-01-25 12:21:08.256  INFO 63 --- [           -C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group testhdfs1
2019-01-25 12:21:08.256  INFO 63 --- [           -C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$3  : partitions revoked:[]
2019-01-25 12:21:08.256  INFO 63 --- [           -C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group testhdfs1
2019-01-25 12:21:08.522  INFO 63 --- [           -C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group testhdfs1 with generation 1
2019-01-25 12:21:08.526  INFO 63 --- [           -C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [testhdfs1.file-0] for group testhdfs1
2019-01-25 12:21:08.735  INFO 63 --- [           -C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$3  : partitions assigned:[testhdfs1.file-0]
2019-01-25 12:21:23.238  INFO 63 --- [           -L-1] o.s.i.codec.kryo.CompositeKryoRegistrar  : registering [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer
2019-01-25 12:21:23.353  INFO 63 --- [           -L-1] o.s.d.h.s.o.AbstractDataStreamWriter     : Creating output for path /data/hdfs-sink-0.txt
珍妮·瓦尔凯拉赫蒂

您不能使用hdfssink将文件复制到 hdfs 中,因为它只是为了写入从源接收的任意消息。您看到该文件长度为零的原因是该文件仍处于打开状态且未刷新。hdfs 接收器自述文件包含配置选项,如果您使用idle-timeoutrollover设置,您将开始看到写入的文件。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Spring Cloud 数据流内部通信

来自分类Dev

文件传输到 HDFS

来自分类Dev

如何将spring cloud任务java jar注册到spring数据流kubernetes中

来自分类Dev

Spring Cloud数据流是否支持批处理

来自分类Dev

Spring Cloud数据流中的Kafka源

来自分类Dev

Spring Cloud数据流-微服务部署

来自分类Dev

Spring Cloud Dataflow与Apache Beam / GCP数据流说明

来自分类Dev

Spring Cloud数据流中的Kafka源

来自分类Dev

如何从 Spring Cloud 数据流创建主题?

来自分类Dev

Spring Cloud 数据流,jdbc-sink-rabbit 源码

来自分类Dev

无法使用Spring Cloud Dataflow将流式数据写入接收器文件

来自分类Dev

使用Flume将文件从远程节点传输到HDFS

来自分类Dev

将文件从 URL 传输到 Cloud Storage

来自分类Dev

Sink组件无法在Spring Cloud数据流中使用Kafka获得正确的数据

来自分类Dev

Spring Cloud数据流和SCDF Kafka数据源

来自分类Dev

使用自己的数据库配置 Spring Cloud 数据流任务

来自分类Dev

Dataproc:HDFS上的热数据,Cloud Storage上的冷数据?

来自分类Dev

PySpark HDFS数据流读取/写入

来自分类Dev

如何在Spring Cloud数据流中处理全局资源?

来自分类Dev

将数据流式传输到Druid服务器

来自分类Dev

使用 RabbitMQ 的 Spring Cloud 流

来自分类Dev

使用spring数据hadoop写入HDFS时的问题

来自分类Dev

将大文件从Google BigQuery传输到Google Cloud Storage

来自分类Dev

如何将AWS EC2 cloud-init日志流式传输到ELK?

来自分类Dev

Shell命令将文件从HDFS传输到Hadoop 2.6.9中的本地文件系统

来自分类Dev

Shell命令将文件从HDFS传输到Hadoop 2.6.9中的本地文件系统

来自分类Dev

从Apache Flink中的HDFS地址流式传输文件

来自分类Dev

从Apache Flink中的HDFS地址流式传输文件

来自分类Dev

使用 Spring Cloud 数据流调度作业时,是否部署/取消部署应用程序实例?

Related 相关文章

  1. 1

    Spring Cloud 数据流内部通信

  2. 2

    文件传输到 HDFS

  3. 3

    如何将spring cloud任务java jar注册到spring数据流kubernetes中

  4. 4

    Spring Cloud数据流是否支持批处理

  5. 5

    Spring Cloud数据流中的Kafka源

  6. 6

    Spring Cloud数据流-微服务部署

  7. 7

    Spring Cloud Dataflow与Apache Beam / GCP数据流说明

  8. 8

    Spring Cloud数据流中的Kafka源

  9. 9

    如何从 Spring Cloud 数据流创建主题?

  10. 10

    Spring Cloud 数据流,jdbc-sink-rabbit 源码

  11. 11

    无法使用Spring Cloud Dataflow将流式数据写入接收器文件

  12. 12

    使用Flume将文件从远程节点传输到HDFS

  13. 13

    将文件从 URL 传输到 Cloud Storage

  14. 14

    Sink组件无法在Spring Cloud数据流中使用Kafka获得正确的数据

  15. 15

    Spring Cloud数据流和SCDF Kafka数据源

  16. 16

    使用自己的数据库配置 Spring Cloud 数据流任务

  17. 17

    Dataproc:HDFS上的热数据,Cloud Storage上的冷数据?

  18. 18

    PySpark HDFS数据流读取/写入

  19. 19

    如何在Spring Cloud数据流中处理全局资源?

  20. 20

    将数据流式传输到Druid服务器

  21. 21

    使用 RabbitMQ 的 Spring Cloud 流

  22. 22

    使用spring数据hadoop写入HDFS时的问题

  23. 23

    将大文件从Google BigQuery传输到Google Cloud Storage

  24. 24

    如何将AWS EC2 cloud-init日志流式传输到ELK?

  25. 25

    Shell命令将文件从HDFS传输到Hadoop 2.6.9中的本地文件系统

  26. 26

    Shell命令将文件从HDFS传输到Hadoop 2.6.9中的本地文件系统

  27. 27

    从Apache Flink中的HDFS地址流式传输文件

  28. 28

    从Apache Flink中的HDFS地址流式传输文件

  29. 29

    使用 Spring Cloud 数据流调度作业时,是否部署/取消部署应用程序实例?

热门标签

归档