我正在尝试通过演员向Kafka发送消息,但该消息不起作用。
以下代码有效
new KeyedMessage[String, Array[Byte]]("my-topic", msg.message)
这不是...为什么?
new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, msg.message)
甚至
new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, null, msg.message)
将partition设置为null,强制其仅填充id,但仍然相同。
有什么想法吗?
编辑
好吧,只是意识到以字节数组形式发送消息时它不起作用。
我更改了代码,允许以字符串形式发送消息,并且可以正常工作。
使用以下命令将不起作用:
props.put("serializer.class", "kafka.serializer.DefaultEncoder")
override def receive: Receive = {
case msg: Message =>
val keyedMessage = new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, msg.message)
producer.send(keyedMessage)
case _ => log.error("Got a msg that I don't understand")
}
将Array [Byte]修改为String以及将DefaultEncoder修改为StringEncoder即可。
任何的想法?
好吧,我才发现问题所在。
实际上,编码器有两个选项。DefaultEncoder和StringEncoder。我试图使用默认编码器将id作为字符串发送,将消息作为字节数组发送。
查看Encoder.scala,您可以看到DefaultEncoder实现和答案。
/**
* The default implementation is a no-op, it just returns the same array it takes in
*/
class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
override def toBytes(value: Array[Byte]): Array[Byte] = value
}
它只是返回字节数组,并且由于我传递了String,所以它抛出了强制转换异常。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句