Kafka Streams:第n个事件的动作

忍者

我正在尝试找到在Kafka Streams中n个事件执行操作的最佳方法

我的情况:我有一个带有一些Events的输入流我必须通过eventType == login过滤它们,并在每次n次登录(假设是第五次)时将同一accountId将此Event发送到输出流。

经过一些调查和不同的尝试,我有了下面的代码版本(我使用的是Kotlin)。

data class Event(
    val payload: Any = {},
    val accountId: String,
    val eventType: String = ""
)
// intermediate class to keep the key and value of the original event
data class LoginEvent(
    val eventKey: String,
    val eventValue: Event
)
fun process() {
        val userLoginsStoreBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("logins"),
            Serdes.String(),
            Serdes.Integer()
        )
        val streamsBuilder = StreamsBuilder().addStateStore(userCheckInsStoreBuilder)
        val inputStream = streamsBuilder.stream<String, String>(inputTopic)

        inputStream.map { key, event ->
            KeyValue(key, json.readValue<Event>(event))
        }.filter { _, event -> event.eventType == "login" }
             .map { key, event -> KeyValue(event.accountId, LoginEvent(key, event)) }
             .transform(
                    UserLoginsTransformer("logins", 5),
                    "logins"
                )
             .filter { _, value -> value }
             .map { key, _ -> KeyValue(key.eventKey, json.writeValueAsString(key.eventValue)) }
             .to("fifth_login", Produced.with(Serdes.String(), Serdes.String()))

        ...
    }
class UserLoginsTransformer(private val storeName: String, private val loginsThreshold: Int = 5) :
    TransformerSupplier<String, CheckInEvent, KeyValue< LoginEvent, Boolean>> {

    override fun get(): Transformer<String, LoginEvent, KeyValue< LoginEvent, Boolean>> {
        return object : Transformer<String, LoginEvent, KeyValue< LoginEvent, Boolean>> {
            private lateinit var store: KeyValueStore<String, Int>

            @Suppress("UNCHECKED_CAST")
            override fun init(context: ProcessorContext) {
                store = context.getStateStore(storeName) as KeyValueStore<String, Int>
            }

            override fun transform(key: String, value: LoginEvent): KeyValue< LoginEvent, Boolean> {
                val counter = (store.get(key) ?: 0) + 1
                return if (counter == loginsThreshold) {
                    store.delete(key)
                    KeyValue(value, true)
                } else {
                    store.put(key, counter)
                    KeyValue(value, false)
                }
            }

            override fun close() {
            }
        }
    }
}

我最大的担心是transform我的情况下函数不是线程安全的。我已经检查了我的案例中使用的KV存储的实现,这是RocksDB存储(非事务性),因此该值可能会在读取和比较之间更新,并且错误的事件将发送到输出。

我的其他想法:

  1. 可以将物化视图用作没有转换器的存储,但是我坚持实现。
  2. 创建一个将使用TransactionalRocksDB的自定义持久性KV存储(不确定是否值得)。
  3. 创建一个自定义的持久性KV存储,该存储将在内部使用ConcurrentHashMap(在我们期望有许多用户的情况下,这可能会导致高内存消耗)。

还有一点要注意:我正在使用Spring Cloud Stream,所以也许该框架针对我的情况有一个内置的解决方案,但是我没有找到它。

我将不胜感激任何建议。提前致谢。

马蒂亚斯·萨克斯

我最大的担心是在我的情况下,转换函数不是线程安全的。我已经检查了我的案例中使用的KV存储的实现,这是RocksDB存储(非事务性),因此该值可能会在读取和比较之间更新,并且错误的事件将发送到输出。

没有理由要担心。如果使用多个线程运行,则每个线程将拥有自己的RocksDB,该RocksDB存储整个数据中的一个分片(请注意,总体状态是基于输入主题分区进行分片的,并且单个分片永远不会由不同的线程处理)。因此,您的代码将正常工作。您唯一需要确保的是,数据是by的分区accountId,因此单个帐户的登录事件将进入同一分片。

如果accountId在输入主题中输入的数据已经按进行分区,则无需执行任何操作。如果没有,则可以控制上游应用程序,最简单的方法是在上游的应用程序生产者中使用自定义分区程序来获取所需的分区。如果您不能更改上游应用程序,则需要在将accountIdas设置为新键之后对数据进行重新分区,即,through()在调用之前进行操作transform()

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Kafka Streams如何获取kafka标头

来自分类Dev

kafka-streams警告kafka连接故障

来自分类Dev

停止Kafka Streams应用

来自分类Dev

Kafka Streams KGroupedTable恢复

来自分类Dev

Kafka Streams JoinWindow 的数据

来自分类Dev

与Kafka合并事件

来自分类Dev

Kafka Streams:动态配置RocksDb

来自分类Dev

Kafka Streams - 旧状态聚合

来自分类Dev

Kafka Streams 检测丢失的记录

来自分类Dev

Kafka Streams transform() 状态存储

来自分类Dev

Kafka 中的事件处理确认

来自分类Dev

Kafka-Streams-加入前过滤GlobalKTable

来自分类Dev

Kafka Streams中数据混乱的原因

来自分类Dev

永久Kafka Streams / KSQL保留策略

来自分类Dev

Kafka Streams K-Table大小监控

来自分类Dev

Kafka Streams 2.5.0需要输入主题

来自分类Dev

Kafka Streams-不可预测的聚合结果

来自分类Dev

使用 Kafka Streams 加入单独的主题?

来自分类Dev

Kafka Streams 中的消息键长

来自分类Dev

Kafka Streams:GlobalStore 线程安全吗?

来自分类Dev

无法安装 npm 包 (kafka-streams)

来自分类Dev

Kafka Streams:处理来自不同分区的消息时的事件时间偏斜

来自分类Dev

通过Kafka与mysql事件进行复制

来自分类Dev

在Kafka上执行事件源架构

来自分类Dev

如何从数据库创建kafka事件?

来自分类Dev

Kafka源流上的事件时间窗口

来自分类Dev

通过Kafka与mysql事件进行复制

来自分类Dev

如何通过Spark从Kafka至少获取N个日志?

来自分类Dev

带有 Kafka Streams 的 Scala Embedded Kafka 中的生产者错误