Delay items emission until item is emitted from another observable

Pavel Dudka

Playing with RxJava now and stumbled upon the following problem:

I have 2 different streams:

  • Stream with items
  • Stream (with just 1 item) which emits transformation information for the first stream.

So essentially I have stream of items and I want all those items to be combined with that single item from 2nd stream:

----a1----a2----a3----a4----a5----|--------------->

-------------b1--|----------------------------------->

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

------------a1b1-a2b1-a3b1-a4b1-a5b1-------->

It looks really similar to combileLatest operator, but combineLatest will ignore all items from the first stream except the closest to the item from the second stream. It means that I will not receive a1b1 - the first resulting item emitted is gonna be a2b1.

I also looked at delay operator, but it doesn't allow me to specify close stream like it is done with buffer operatior

Is there any fancy operator which solves the problem above?

Vladimir Mironov

AFAIK, there is no a built-in operator to achieve the behavior you've described. You can always implement a custom operator or build it on top of existing operators. I think the second option is easier to implement and here is the code:

public static <L, R, T> Observable<T> zipper(final Observable<? extends L> left, final Observable<? extends R> right, final Func2<? super L, ? super R, ? extends T> function) {
    return Observable.defer(new Func0<Observable<T>>() {
        @Override
        public Observable<T> call() {
            final SerialSubscription subscription = new SerialSubscription();
            final ConnectableObservable<? extends R> cached = right.replay();

            return left.flatMap(new Func1<L, Observable<T>>() {
                @Override
                public Observable<T> call(final L valueLeft) {
                    return cached.map(new Func1<R, T>() {
                        @Override
                        public T call(final R valueRight) {
                            return function.call(valueLeft, valueRight);
                        }
                    });
                }
            }).doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    subscription.set(cached.connect());
                }
            }).doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    subscription.unsubscribe();
                }
            });
        }
    });
}

If you have any questions regarding the code, I can explain it in details.

UPDATE

Regarding the questing how my solution is different from the following one:

left.flatMap(valueLeft -> right.map(valueRight -> together(valueLeft, valueRight)));
  1. Parallel execution - in my implementation both left and right observables are executing in parallel. right observable doesn't have to wait for a left one to emit its first item.
  2. Caching - my solution subscribes only once to the right observables and caches its result. Thats why b1 will always be the same for all aXXX items. The solution provided by akarnokd subscribes to the rightobservable every time the left one emits an item. That means:

    • There is no guarantee that b1 won't change its value. For example for the following observable you will get a different b for each a.

      final Observable<Double> right = Observable.defer(new Func0<Observable<Double>>() {
          @Override
          public Observable<Double> call() {
              return Observable.just(Math.random());
          }
      });
      
    • If the right observable is a time consuming operation (e.g. network call), you will have to wait for its completion every time the left observable emits a new item.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

From Dev

How to process items emitted by an Observable with access to values from another?

From Dev

Delay emission of observable until the execution of the current Angular tick has finished

From Dev

Return only emitted items from an Observable

From Dev

rxjava delay: How to get variable delay on each item emitted from a list?

From Java

RxJava delay for each item of list emitted

From Dev

Is Error Considered an Emission from an Observable in RxJs?

From Dev

How to emit items from list with delay between each item?

From Dev

How to emit items from list with delay between each item?

From Dev

How can an Observable be paused without losing the items emitted?

From Dev

Buffer Observable Until Another Observable Completes

From Dev

How to emit a final item after all the items in the Flowable are emitted

From Dev

Adding delay between Observable Items RxJava

From Dev

Delay an init script until another process is finished?

From Dev

how to start another activity when click on item from spinner items

From Dev

Remove items from ListBox if item exists in another ListBox

From Dev

How to return last emitted value from observable with late subscribe

From Dev

Return the latest count of emitted values from Observable during subscription

From Dev

Javascript rxjs - do something when last value emitted from observable

From Dev

RXJava - buffer observable 1 until observable 2 emits one item

From Dev

Python: idiomatic way to drop items from a list until an item matches a condition?

From Dev

Infinite observable from another observable

From Dev

Delaying Rx-java Observable until another Observable completes

From Dev

Hide selected item & show another item instead from a list of items in jquery

From Dev

Boost: Block until queue has another item

From Dev

Boost: Block until queue has another item

From Dev

How to buffer only latest emission from rx.Observable during backpressure

From Dev

Find out if string list items startswith another item from another list

From Dev

Find out if string list items startswith another item from another list

From Dev

How to emit items from a Collection with delay in RxJava?

Related Related

  1. 1

    How to process items emitted by an Observable with access to values from another?

  2. 2

    Delay emission of observable until the execution of the current Angular tick has finished

  3. 3

    Return only emitted items from an Observable

  4. 4

    rxjava delay: How to get variable delay on each item emitted from a list?

  5. 5

    RxJava delay for each item of list emitted

  6. 6

    Is Error Considered an Emission from an Observable in RxJs?

  7. 7

    How to emit items from list with delay between each item?

  8. 8

    How to emit items from list with delay between each item?

  9. 9

    How can an Observable be paused without losing the items emitted?

  10. 10

    Buffer Observable Until Another Observable Completes

  11. 11

    How to emit a final item after all the items in the Flowable are emitted

  12. 12

    Adding delay between Observable Items RxJava

  13. 13

    Delay an init script until another process is finished?

  14. 14

    how to start another activity when click on item from spinner items

  15. 15

    Remove items from ListBox if item exists in another ListBox

  16. 16

    How to return last emitted value from observable with late subscribe

  17. 17

    Return the latest count of emitted values from Observable during subscription

  18. 18

    Javascript rxjs - do something when last value emitted from observable

  19. 19

    RXJava - buffer observable 1 until observable 2 emits one item

  20. 20

    Python: idiomatic way to drop items from a list until an item matches a condition?

  21. 21

    Infinite observable from another observable

  22. 22

    Delaying Rx-java Observable until another Observable completes

  23. 23

    Hide selected item & show another item instead from a list of items in jquery

  24. 24

    Boost: Block until queue has another item

  25. 25

    Boost: Block until queue has another item

  26. 26

    How to buffer only latest emission from rx.Observable during backpressure

  27. 27

    Find out if string list items startswith another item from another list

  28. 28

    Find out if string list items startswith another item from another list

  29. 29

    How to emit items from a Collection with delay in RxJava?

HotTag

Archive