我想使用以下设置启动一个logstash实例:
input {
kafka {
topic_id => "topic_a"
.......
}
kafka {
topic_id => "topic_b"
.......
}
}
filter {
json {
source => "message"
}
uuid {
target => "@uuid"
}
mutate {
replace => { "message" => "%{message}" } # want to get the full json literal but does not work
add_field => {
"topic" => "%{topic_id}" # it does not work either
}
}
# logic to apply different filter base on topic_id
if [topic_id] =~ 'topic_a' { # this block seems never entered
mutate {
replace => { "topic" => "topic_a" }
}
} else {
.....
}
}
output {
.....
}
我的Kibana上的输出应如下所示:
topic : %{topic_id}
它建议上面的配置无法提取topic_id。我不知道如何配置过滤器部分。有人可以对此提供提示吗?谢谢。
顺便说一句我正在使用logstash-2.2.2
编辑:根据logstash文档更新了配置,结果仍然相同
默认情况下,Kafka输入插件不包含元数据信息,例如:topic_id ..
您必须启用以下decorate_events
选项:
kafka {
topic_id => "topic_a"
decorate_events => true
}
完成此操作后,您可以在kafka
带有topic
键的数组中找到topic_id 。
decorate_events
值类型为boolean默认值为false将事件添加到Kafka元数据(如主题,消息大小)的选项。这将向logstash事件添加一个名为kafka的字段,其中包含以下属性:msg_size:此消息的完整序列化大小(以字节为单位)(包括crc,标头属性等)主题:该消息与Consumer_group关联的主题:使用者组用于在此事件中读取的分区:该消息与分区关联的键:包含消息键https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins的ByteBuffer -inputs-kafka-decorate_events
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句