我的Kafka二进制文件中产生了一些过程(从Java到字节数组)。
我正在尝试使用Logstash从Kafka消费文件并将文件上传到s3。
我的管道:
input {
kafka {
bootstrap_servers => "my-broker:9092"
topic => "my-topic"
partition_assignment_strategy => "org.apache.kafka.clients.consumer.StickyAssignor"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
filter {
mutate {
remove_field => ["@timestamp", "host"]
}
}
output {
s3 {
region => "eu-west-1"
bucket => "my_bucket"
time_file => 1
prefix => "files/"
rotation_strategy => "time"
}
}
如您所见,我使用了另一个反序列化器类。但是,似乎Logstash默认使用一种将字节数组转换为字符串的编码。我的目标是将文件原样上传到s3。是否有已知的编解码器不对输入数据做任何事情并按原样上传它?
现在,文件已上传到s3,但我看不到它们或打开它们。Logstash破坏了二进制内容。例如,我尝试发送一个内部包含多个文件的gzip,之后我无法在s3中打开它。
我在Logstash上收到的警告:
0-06-02T10:49:29,149][WARN ][logstash.codecs.plain ][my_pipeline] Received an event that has a different character encoding than you configured. {:text=>"7z\\xBC\\xAF'\\u001C\\u0000\\u0002\\xA6j<........more binary data", :expected_charset=>"UTF-8"}
我不确定Logstash是否最适合传递二进制数据,我最终实现了Java使用者,但是以下解决方案对我来说适用于Logstash:
请注意,输出将获得二进制数据,并且您需要对其进行反序列化。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句