使用kafka检测值的变化

尼基尔·萨胡(Nikhil Sahu)

我有一个流应用程序,该应用程序连续接收坐标流以及一些自定义元数据,其中还包括一个位串。使用生产者API将该流产生到kafka主题上。现在,另一个应用程序需要处理此流[Streams API]并存储位字符串中的特定位,并在该位更改时生成警报

以下是需要处理的连续消息流

{"device_id":"1","status_bit":"0"}
{"device_id":"2","status_bit":"1"}
{"device_id":"1","status_bit":"0"}
{"device_id":"3","status_bit":"1"}
{"device_id":"1","status_bit":"1"} // need to generate alert with change: 0->1
{"device_id":"3","status_bits":"1"}
{"device_id":"2","status_bit":"1"}
{"device_id":"3","status_bits":"0"} // need to generate alert with change 1->0

现在,我想将这些警报写到另一个kafka主题,例如

{"device_id":1,"init":0,"final":1,"timestamp":"somets"}
{"device_id":3,"init":1,"final":0,"timestamp":"somets"}

我可以使用以下方法将当前位保存在状态存储中

streamsBuilder
        .stream("my-topic")
        .mapValues((key, value) -> value.getStatusBit())
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
        .reduce((oldAggValue, newMessageValue) -> newMessageValue, Materialized.as("bit-temp-store"));

但我无法理解如何从现有位检测到此更改。我是否需要以某种方式查询处理器拓扑中的状态存储?如是?怎么样?如果不?还有什么可以做的?

我可以尝试的任何建议/想法(可能与我的想法完全不同)。我是Kafka的新手,对事件驱动流的思考使我难以理解。

提前致谢。

卡蒂娅·戈尔什科娃(Katya Gorshkova)

我不确定这是最好的方法,但是在类似的任务中,我使用了一个中间实体来捕获状态变化。在您的情况下,它将类似于

streamsBuilder.stream("my-topic").groupByKey()
          .aggregate(DeviceState::new, new Aggregator<String, Device, DeviceState>() {
        public DeviceState apply(String key, Device newValue, DeviceState state) {
            if(!newValue.getStatusBit().equals(state.getStatusBit())){
                 state.setChanged(true);    
            }
            state.setStatusBit(newValue.getStatusBit());
            state.setDeviceId(newValue.getDeviceId());
            state.setKey(key);
            return state;
        }
    }, TimeWindows.of(…) …).filter((s, t) -> (t.changed())).toStream();

在结果主题中,您将进行更改。您还可以向DeviceState添加一些属性以首先对其进行初始化,具体取决于您是否要发送事件,何时到达第一条设备记录等。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

使用Rxjava检测值变化

来自分类Dev

检测textarea值变化

来自分类Dev

如何使用SwiftUI和Combine检测Datepicker的值变化?

来自分类Dev

使用jQuery检测选择字段的选项值的变化

来自分类Dev

如何使用 JS/Jquery 检测输入的值何时动态变化

来自分类Dev

使用SSE检测readyState的变化

来自分类Dev

使用viewWillTransitionToSize检测方向变化

来自分类Dev

未检测到输入值变化

来自分类Dev

使用javascript或jquery检测struts2自动完成标记的值变化

来自分类Dev

如何使用BehaviorSubject来检测变化

来自分类Dev

使用mutationobservers检测获取请求结果的变化

来自分类Dev

在angularjs中设置值时如何检测输入元素的变化

来自分类Dev

我无法以编程方式检测angularjs中的值变化

来自分类Dev

DOM问题无法检测香草JS中的变化值

来自分类Dev

熊猫在python中检测熊猫系列中日期值的变化

来自分类Dev

在angularjs中设置值时如何检测输入元素的变化

来自分类Dev

如何在GXT组合框中检测实际值变化

来自分类Dev

从jQuery检测PHP foreach内部的输入值变化

来自分类Dev

检测html表格单元格值变化

来自分类Dev

每次更改角度值时如何检测变量的变化

来自分类Dev

如何使用广播接收器检测蓝牙状态变化?

来自分类Dev

jQuery使用keyup()检测两个元素的变化

来自分类Dev

如何使用Node.js检测网络变化

来自分类Dev

如何在React中使用React Router检测路由变化?

来自分类Dev

如何在React中使用React Router检测路由变化?

来自分类Dev

如果不使用路线,则检测角度的位置变化

来自分类Dev

jQuery使用keyup()检测两个元素的变化

来自分类常见问题

如何检测方向变化?

来自分类Dev

如何检测元素的变化

Related 相关文章

热门标签

归档