The library I'm using emits a series of Message
objects using callback object.
interface MessageCallback {
onMessage(Message message);
}
The callback is added using some libraryObject.setCallback(MessageCallback)
call and the process is started using non-blocking libraryObject.start()
method call.
What is the best way of creating an Observable<Message>
that will emit those objects?
What if the libraryObject.start()
is blocking?
I think you need something like this (example given in scala)
import rx.lang.scala.{Observable, Subscriber}
case class Message(message: String)
trait MessageCallback {
def onMessage(message: Message)
}
object LibraryObject {
def setCallback(callback: MessageCallback): Unit = {
???
}
def removeCallback(callback: MessageCallback): Unit = {
???
}
def start(): Unit = {
???
}
}
def messagesSource: Observable[Message] =
Observable((subscriber: Subscriber[Message]) ⇒ {
val callback = new MessageCallback {
def onMessage(message: Message) {
subscriber.onNext(message)
}
}
LibraryObject.setCallback(callback)
subscriber.add {
LibraryObject.removeCallback(callback)
}
})
As for the blocking/non-blocking start()
: Usually callback-based architecture separates callback subscription and the process start. In that case, you can create as many messageSource
s as you want completely independently of when you start()
the process. Also the decision whether you fork it or not is completely upon you. Is your architecture different from this?
You should also handle finishing the process somehow. The best would be to add an onCompleted
handler to the MessageCallback interface. If you want to handle errors, also add an onError
handler. Now behold, you have just declared the fundamental building stone of RxJava, an Observer :-)
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments