我有一个流,events(someid:String, name:String)
出于监视的原因,我需要每个事件ID都有一个计数器。在所有Flink文档和示例中,我可以看到该计数器是,例如,open
以map函数的名称初始化。
但是在我的情况下,我无法初始化计数器,因为每个eventId需要一个计数器,并且我不预先知道该值。而且,我知道每当map()
MapFunction的方法中一次通过时创建一个新的计数器将是多么昂贵。最后,我不能保留计数器的“缓存”,因为它太大了。
理想情况下,我想要这样的东西:
class Event(id: String, name: String)
class ExampleMapFunction extends RichMapFunction[Event, Event] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = new Counter()
}
override def map(event: Event): Event = {
counter.inc(event.id)
event
}
}
还是基本上我可以实现自己的计数器以允许我传递尺寸?如果是,如何?
对于这种用例有什么建议或最佳实践吗?
如果保留计数器的缓存太大,那么我认为使用指标不会满足您的需求进行扩展。
一些选择:
使用侧面输出在某些外部可查询/可视化数据存储中收集有意义的事件,例如influxdb。
将信息保持在键控状态,并根据需要使用广播消息触发其相关部分的输出(再次使用侧面输出)。
将信息保持为键控状态,并获取定期保存点,然后使用状态处理器API通过查询进行分析。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句