我对scala编程还很陌生。有人可以帮我提供回调的返回值吗?如何从调用方法中以JsObject的形式返回回调值?我在演员系统中使用Play2框架。请让我知道我的返回类型是否错误,并且与SendToKafka方法中的JsObject相比,我应该返回Future。
我有以下代码
override def SendToKafka(data: JsValue): Option[JsObject] = {
val props: Map[String, AnyRef] = Map(
"bootstrap.servers" -> "localhost:9092",
"group.id" -> "CountryCounter",
"key.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url" -> "http://localhost:8081"
)
val schema: Schema = new Parser().parse(Source.fromURL(getClass.getResource("/test.avsc")).mkString)
val gRecord: GenericRecord = new GenericData.Record(schema)
gRecord.put("emp_id", request.emp_id)
val producer = new KafkaProducer[Int, GenericRecord](props.asJava)
val record = new ProducerRecord("Emp", 1, gRecord)
val promise = Promise[RecordMetadata]()
producer.send(record, producerCallback(promise))
val f = promise.future
val returnValue : Some[JsObject] =null
val con = Future {
f onComplete {
case Success(r) => accessLogger.info("r" + r.offset())
case Failure(e) => accessLogger.info("e "+ e)
}
// I would like to return offset as JsObject or exception ( if any )
}
private def producerCallback(promise: Promise[RecordMetadata]): Callback = {
new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
val result = if (exception == null) {
//accessLogger.info("offset - " + metadata.offset())
// I would like to return this offset as JsObject
Success(metadata)
}
else {
accessLogger.error(exception.printStackTrace().toString)
Failure(exception)
// I would like to return exception (if any ) as JsObject
}
promise.complete(result)
}
}
}
因为promise
是类型Promise[RecordMetadata]
和f
是promise.future
,f
是类型Future[RecordMetadata]
。未来将举行什么result
,是promise.complete(result)
。
将来可能会包含失败(即Failure(exception)
在您的回调中),因此需要进行处理(在使用匹配/案例的情况下)
Await.ready
可以用来等到将来出现aSuccess
或Failure
-,但是如果没有这种阻塞调用,在同一方法内,将来可能还没有完成。
import scala.concurrent.duration._
import scala.concurrent._
...
// arbitrary time -- set an appropriate wait time
val fReady: Future[RecordMetadata] = Await.ready(f, 4.seconds)
// After Await.ready is called, *up to* the duration (4s here) has elapsed and the future should have a result
// you probably need to change the return type to Either if you use this approach,
// or change this to Option type and ignore the failure, assuming that the exception is logged already
val result: Either[Throwable, Int] = fReady.value match {
case Some(Success(a)) => Right(a) // you can edit this to compute a JsValue from `a` if you want
case Some(Failure(b)) => Left(b)
case None => Left(new RuntimeException("Unexpected"))
}
// can be return type or edit this
result
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句