Scala-回调的返回值

用户名

我对scala编程还很陌生。有人可以帮我提供回调的返回值吗?如何从调用方法中以JsObject的形式返回回调值?我在演员系统中使用Play2框架。请让我知道我的返回类型是否错误,并且与SendToKafka方法中的JsObject相比,我应该返回Future。

我有以下代码

override def SendToKafka(data: JsValue): Option[JsObject] = {
  val props: Map[String, AnyRef] = Map(
    "bootstrap.servers" -> "localhost:9092",
    "group.id" -> "CountryCounter",
    "key.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "schema.registry.url" -> "http://localhost:8081"
  )

  val schema: Schema = new Parser().parse(Source.fromURL(getClass.getResource("/test.avsc")).mkString)

  val gRecord: GenericRecord = new GenericData.Record(schema)
  gRecord.put("emp_id", request.emp_id)

  val producer = new KafkaProducer[Int, GenericRecord](props.asJava)
  val record = new ProducerRecord("Emp", 1, gRecord)

  val promise = Promise[RecordMetadata]()

  producer.send(record, producerCallback(promise))
  val f = promise.future
  val returnValue : Some[JsObject] =null
  val con = Future {
    f onComplete {
      case Success(r) => accessLogger.info("r" + r.offset())
      case Failure(e) => accessLogger.info("e "+ e)
    }

    // I would like to return offset as JsObject or exception ( if any )
  }

  private def producerCallback(promise: Promise[RecordMetadata]): Callback = {
    new Callback {
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {

      val result = if (exception == null) {
        //accessLogger.info("offset - " + metadata.offset())
        // I would like to return this offset as JsObject 
        Success(metadata)
      }
      else {
        accessLogger.error(exception.printStackTrace().toString)
        Failure(exception)
        // I would like to return exception (if any ) as JsObject 
      }
      promise.complete(result)
    }
  }
}
琳达

因为promise是类型Promise[RecordMetadata]fpromise.futuref是类型Future[RecordMetadata]未来将举行什么result,是promise.complete(result)

将来可能会包含失败(即Failure(exception)在您的回调中),因此需要进行处理(在使用匹配/案例的情况下)

Await.ready可以用来等到将来出现aSuccessFailure-,但是如果没有这种阻塞调用,在同一方法内,将来可能还没有完成。

import scala.concurrent.duration._
import scala.concurrent._
...
// arbitrary time -- set an appropriate wait time
val fReady: Future[RecordMetadata] = Await.ready(f, 4.seconds)

// After Await.ready is called, *up to* the duration (4s here) has elapsed and the future should have a result
// you probably need to change the return type to Either if you use this approach,
// or change this to Option type and ignore the failure, assuming that the exception is logged already 
val result: Either[Throwable, Int] = fReady.value match {
  case Some(Success(a)) => Right(a) // you can edit this to compute a JsValue from `a` if you want
  case Some(Failure(b)) => Left(b)
  case None => Left(new RuntimeException("Unexpected"))
}

// can be return type or edit this
result

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

从 Scala Future 返回值

来自分类Dev

从Python中的回调返回值

来自分类Dev

在回调中使用返回值

来自分类Dev

Ruby FFI回调返回值

来自分类Dev

从映射中的回调返回值

来自分类Dev

Javascript,从回调返回值

来自分类Dev

回调,JavaScript返回值

来自分类Dev

Scala隐式返回值问题

来自分类Dev

UDF Scala返回值[max,index]

来自分类Dev

Scala隐式返回值问题

来自分类Dev

如何从scala中的if块返回值

来自分类Dev

Scala如何定义返回值的类型?

来自分类Dev

返回值的scala多态类型

来自分类Dev

Scala - Scanleft,在迭代中返回值而不在下一回合使用它

来自分类Dev

在回调Angular JS指令中获取返回值

来自分类Dev

消息回调很少返回值-Chrome扩展程序

来自分类Dev

南希路线-回调返回值

来自分类Dev

从Meteor.method中的回调返回值

来自分类Dev

PHP回调MySQL从不返回值

来自分类Dev

带参数和返回值的JavaScript回调

来自分类Dev

NodeJS获得异步返回值(回调)

来自分类Dev

是否可以从promise通知回调中返回值?

来自分类Dev

如何从函数内的回调返回值?

来自分类Dev

在返回值之前,如何等待回调完成?

来自分类Dev

useMutation中的onCompleted回调没有返回值

来自分类Dev

皮卡:如何从回调函数获取返回值?

来自分类Dev

猫鼬从withTransaction回调返回值

来自分类Dev

Firebase UI回调中的signInSuccessWithAuthResult返回值

来自分类Dev

从Meteor.method中的回调返回值