在Kafka Connect中将long转换为int64

用户名

我正在尝试在Kafka Connect中编写自定义的单个消息转换。从一个int64 / Date类型字段中,我正在生成一个长值,但是当我试图用相同的模式(int64类型)将它写回updatedValue时,却遇到了一个问题:

    for (Field field : value.schema().fields()) {
        final Object origFieldValue = value.get(field);
        if (timeField.equals(field.name())){
            long date = convertDate(origFieldValue);
            updatedValue.put(field, date);
        }
        updatedValue.put(field, origFieldValue);

    }

错误是:

[2020-05-14 14:31:52,120] INFO WorkerSourceTask{id=datechanger} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
connect_1    | [2020-05-14 14:31:52,120] INFO WorkerSourceTask{id=datechanger-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
connect_1    | [2020-05-14 14:31:52,120] ERROR WorkerSourceTask{id=datechanger-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
connect_1    | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect_1    |  at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
connect_1    |  at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect_1    |  at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect_1    |  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1    |  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1    |  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1    |  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1    |  at java.lang.Thread.run(Thread.java:748)
connect_1    | Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.lang.Long for field: "date"

是否有机会将该long值转换回int64,使其适合模式

丹尼斯·科罗比辛(Denis Korobitsin)

我遇到过同样的问题。对我来说,问题是我Date.SCHEMA从kafka.connect.data包中指定了一个架构。

这样,您就不需要传递Long值,只需传递java.util.Date本身即可。模式验证器将解决该问题。

我通过在这里挖掘源代码发现了这一点-https: //github.com/apache/kafka/blob/2.6/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L69 https://github.com/apache/kafka/blob/2.6/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L271

这里的问题是,它LOGICAL_TYPE_CLASSES之前找到列表SCHEMA_TYPE_CLASSES,因此尝试将其视为日期。错误消息只是误导。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

在C ++中将Java long转换为int64

来自分类Dev

在python列中将这些对象转换为int64

来自分类Dev

如何在golang中将int(int64)转换为uint16?

来自分类Dev

如何在Go中将int64转换为int?

来自分类Dev

如何从double转换为Int64

来自分类Dev

无法从“ _int64”转换为“数据*”

来自分类Dev

如何从double转换为Int64

来自分类Dev

Kafka主题与Kafka Connect合并到HDFS

来自分类Dev

Kafka和Kafka Connect部署环境

来自分类Dev

如何使用 Flume 将主题、kafka 转换为 kafka?

来自分类Dev

如何在go中将int64转换为字节数组?

来自分类Dev

在Pandas中将文本转换为int64类别

来自分类Dev

在Excel中将文本数据转换为int64格式

来自分类Dev

在javascript中将字节数组转换为带符号的int64

来自分类Dev

如何在Julia中将DataFrame的DateTime元素转换为Int64毫秒?

来自分类Dev

如何在Perl中将字节数组转换为int64

来自分类Dev

如何防止Long / Int64 ToString()转换为指数格式?

来自分类Dev

Kafka Connect实施错误

来自分类Dev

Kafka Connect 中的动态消息转换

来自分类Dev

如何将NSNumber转换/转换为Int64?

来自分类Dev

将盒装 int64 转换为 int

来自分类Dev

在Java中将long转换为int

来自分类Dev

如何在python中将列数据类型int64转换为分类列数据类型?

来自分类常见问题

Golang将字符串转换为int64

来自分类Dev

将[Int64]的Swift数组转换为NSArray

来自分类Dev

从int64转换为字节数组

来自分类Dev

将布尔列表转换为_int64

来自分类Dev

将Int64转换为UUID

来自分类Dev

将 DateTimeOffset 转换为 Int64 并返回到 DateTimeOffset