Scala + Kafka不发送ID为的消息

蒂亚戈·佩雷拉(Thiago Pereira)

我正在尝试通过演员向Kafka发送消息,但该消息不起作用。

以下代码有效

new KeyedMessage[String, Array[Byte]]("my-topic", msg.message)

这不是...为什么?

new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, msg.message)

甚至

new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, null, msg.message)

将partition设置为null,强制其仅填充id,但仍然相同。

有什么想法吗?

编辑

好吧,只是意识到以字节数组形式发送消息时它不起作用。

我更改了代码,允许以字符串形式发送消息,并且可以正常工作。

使用以下命令将不起作用:

  props.put("serializer.class", "kafka.serializer.DefaultEncoder")

  override def receive: Receive = {
    case msg: Message =>
      val keyedMessage = new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, msg.message)
      producer.send(keyedMessage)

    case _ => log.error("Got a msg that I don't understand")
  }

将Array [Byte]修改为String以及将DefaultEncoder修改为StringEncoder即可。

任何的想法?

蒂亚戈·佩雷拉(Thiago Pereira)

好吧,我才发现问题所在。

实际上,编码器有两个选项。DefaultEncoder和StringEncoder。我试图使用默认编码器将id作为字符串发送,将消息作为字节数组发送。

查看Encoder.scala,您可以看到DefaultEncoder实现和答案。

/**
 * The default implementation is a no-op, it just returns the same array it takes in
 */
class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
  override def toBytes(value: Array[Byte]): Array[Byte] = value
}

它只是返回字节数组,并且由于我传递了String,所以它抛出了强制转换异常。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Scala演员:未收到消息

来自分类Dev

Objective-C协议不发送消息

来自分类Dev

晦涩的Scala错误消息

来自分类Dev

Scala对象的唯一ID

来自分类Dev

Scala的元组为Right

来自分类Dev

PHPMailer-某些SMTP消息不发送

来自分类Dev

标为“类型”的scala方法

来自分类Dev

Scala消息传递顺序

来自分类Dev

()与{}的Scala

来自分类Dev

Scala向自己发送消息是好是坏?

来自分类Dev

在不发送消息的情况下获取GCM规范注册ID

来自分类Dev

错误消息未显示Scala播放表单

来自分类Dev

Scala Playframework发送文件

来自分类Dev

Akka Scala TestKit测试PoisonPill消息

来自分类Dev

SimpMessagingTemplate在Spring Boot中不发送消息

来自分类Dev

Vertx Scala:如何设置Websocket连接并发送消息

来自分类Dev

Scala akka类型:如何从实例获取ActorRef到Actor并发送消息本身?

来自分类Dev

Spring Integration TCP不发送消息

来自分类Dev

为Scala地图增值

来自分类Dev

Java套接字不发送/接收消息

来自分类Dev

在Scala中为Binary的整数

来自分类Dev

在Scala中发送HTTP请求

来自分类Dev

标为“类型”的scala方法

来自分类Dev

使用Play Framework 2.3.6的Scala:向所有套接字客户端发送消息

来自分类Dev

不发送消息,主题或来自谁?

来自分类Dev

“?:”运算符为scala

来自分类Dev

如何使用Scala将消息正确发送到Amazon SMS队列?

来自分类Dev

Java/Scala Kafka Producer 不向主题发送消息

来自分类Dev

如何使用 fcm(firebase 云消息传递)和 Scala Play 框架发送通知?