How to filter in ReactiveX too frequent onSubscribe requests

Rostyslav Roshak

I have an observable that performs time consuming network operations. The client code might subscribe frequently to the observable which leads to a high network load.

Since we can't control when a subscriber appears this has to be done on the observable side.

Dave Moten

When the number of concurrent subscribers is at a maximum you want further subscribers to receive an empty stream.

Given a source that you want to limit subscriptions for, do this:

Observable<T> limited = source.compose(
        new TransformerLimitSubscribers<T>(
            new AtomicInteger(), maxSubscribers))
     .onErrorResumeNext(Observable.<T>empty());
...
limited.subscribe(s1);
...
limited.subscribe(s2); 

where the transfomer is defined by this class:

public final class TransformerLimitSubscribers<T> implements Transformer<T, T> {

    private final AtomicInteger subscriberCount;
    private final int maxSubscribers;

    public TransformerLimitSubscribers(AtomicInteger subscriberCount, int maxSubscribers) {
        this.subscriberCount = subscriberCount;
        this.maxSubscribers = maxSubscribers;
    }

    @Override
    public Observable<T> call(Observable<T> o) {
        return o.doOnSubscribe(onSubscribe()).doOnUnsubscribe(onUnsubscribe());
    }

    private Action0 onSubscribe() {
        return new Action0() {

            @Override
            public void call() {
                if (subscriberCount.incrementAndGet() > maxSubscribers)
                    throw new TooManySubscribersException();
            }
        };
    }

    private Action0 onUnsubscribe() {
        return new Action0() {

            @Override
            public void call() {
                subscriberCount.decrementAndGet();
            }
        };
    }

    public static class TooManySubscribersException extends RuntimeException {
    }

}

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

From Dev

Pandas: Filter dataframe for values that are too frequent or too rare

From Dev

pandas - how to filter "most frequent" Datetime objects

From Dev

How to check if a request is taking too long in requests?

From Dev

How to check if a request is taking too long in requests?

From Dev

How does/frequent unix tee command write stdout terminal output to file? if the output is too big

From Dev

How to cleanup on Observable created with the .create(OnSubscribe) method

From Dev

How to filter pull requests on GitHub by commentaries authors?

From Dev

How to filter requests in EJB based web applications?

From Dev

How to filter pull requests on GitHub by commentaries authors?

From Dev

ReactiveX JS and TypeScript - How to unsubscribe?

From Dev

Getting too frequent location updates in the background

From Dev

Too frequent load of Quartz Scheduler in a Spring application

From Dev

Too frequent disk checks on Ubuntu 12.04

From Dev

Frequent 2 minutes pauses on web browser requests

From Dev

How to prevent 429 (Too Many Requests) error using setTimeout

From Dev

Too many OPTIONS requests

From Dev

How to deal with frequent classes?

From Dev

NPE while filter list with io.reactivex.Observable

From Dev

SPARK SQL : How to filter records by multiple colmuns and using groupBy too

From Dev

How to use exhaustMap in ReactiveX/rxjs 5 in TypeScript

From Dev

How to get pull requests filter with owner using Github API

From Dev

How can a Laravel package filter all requests and redirect if necessary?

From Dev

How can a Laravel package filter all requests and redirect if necessary?

From Dev

How to filter meta-tags in json response with Python Requests?

From Dev

How to filter HTTP requests based on body before they get to the server?

From Dev

await too slow for frequent events; can async methods ignore cancellations?

From Dev

await too slow for frequent events; can async methods ignore cancellations?

From Dev

Ajax making too many requests

From Dev

Is it better to keep a socket open for frequent requests, or to close the socket each time

Related Related

  1. 1

    Pandas: Filter dataframe for values that are too frequent or too rare

  2. 2

    pandas - how to filter "most frequent" Datetime objects

  3. 3

    How to check if a request is taking too long in requests?

  4. 4

    How to check if a request is taking too long in requests?

  5. 5

    How does/frequent unix tee command write stdout terminal output to file? if the output is too big

  6. 6

    How to cleanup on Observable created with the .create(OnSubscribe) method

  7. 7

    How to filter pull requests on GitHub by commentaries authors?

  8. 8

    How to filter requests in EJB based web applications?

  9. 9

    How to filter pull requests on GitHub by commentaries authors?

  10. 10

    ReactiveX JS and TypeScript - How to unsubscribe?

  11. 11

    Getting too frequent location updates in the background

  12. 12

    Too frequent load of Quartz Scheduler in a Spring application

  13. 13

    Too frequent disk checks on Ubuntu 12.04

  14. 14

    Frequent 2 minutes pauses on web browser requests

  15. 15

    How to prevent 429 (Too Many Requests) error using setTimeout

  16. 16

    Too many OPTIONS requests

  17. 17

    How to deal with frequent classes?

  18. 18

    NPE while filter list with io.reactivex.Observable

  19. 19

    SPARK SQL : How to filter records by multiple colmuns and using groupBy too

  20. 20

    How to use exhaustMap in ReactiveX/rxjs 5 in TypeScript

  21. 21

    How to get pull requests filter with owner using Github API

  22. 22

    How can a Laravel package filter all requests and redirect if necessary?

  23. 23

    How can a Laravel package filter all requests and redirect if necessary?

  24. 24

    How to filter meta-tags in json response with Python Requests?

  25. 25

    How to filter HTTP requests based on body before they get to the server?

  26. 26

    await too slow for frequent events; can async methods ignore cancellations?

  27. 27

    await too slow for frequent events; can async methods ignore cancellations?

  28. 28

    Ajax making too many requests

  29. 29

    Is it better to keep a socket open for frequent requests, or to close the socket each time

HotTag

Archive