我遇到一种情况,正在创建包含数据库结果的Observable。然后,我对它们应用了一系列过滤器。然后,我有一个正在记录结果的订户。可能是没有元素通过过滤器的情况。我的业务逻辑指出这不是错误。但是,发生这种情况时,将调用onError并包含以下异常:java.util.NoSuchElementException: Sequence contains no elements
公认的做法是仅检测该类型的异常并忽略它吗?还是有更好的方法来解决这个问题?
版本是1.0.0。
这是一个简单的测试用例,它展示了我所看到的。这似乎与在到达地图并减少事件之前对所有事件进行过滤有关。
@Test
public void test()
{
Integer values[] = new Integer[]{1, 2, 3, 4, 5};
Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).reduce(new Func2<String, String, String>()
{
@Override
public String call(String s, String s2)
{
return s + "," + s2;
}
})
.subscribe(new Action1<String>()
{
@Override
public void call(String s)
{
System.out.println(s);
}
});
}
因为我使用的是安全订阅者,所以最初会引发一个OnErrorNotImplementedException,该异常包装以下异常:
java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
根据以下@davem的答案,我创建了一个新的测试用例:
@Test
public void testFromBlockingAndSingle()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).reduce(new Func2<String, String, String>()
{
@Override
public String call(String s, String s2)
{
return s + "," + s2;
}
}).toList().toBlocking().single();
System.out.println("Test: " + results + " Size: " + results.size());
}
并且此测试导致以下行为:
当输入为:
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
然后结果(如预期的那样)是:
Test: [-2,-1] Size: 1
当输入为:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
然后结果是以下堆栈跟踪:
java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:441)
at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
at EmptyTest2.test(EmptyTest2.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
因此看来,问题肯定出在使用reduce函数上。请参阅以下处理两种情况的测试用例:
@Test
public void testNoReduce()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).toList().toBlocking().first();
Iterator<String> itr = results.iterator();
StringBuilder b = new StringBuilder();
while (itr.hasNext())
{
b.append(itr.next());
if (itr.hasNext())
b.append(",");
}
System.out.println("Test NoReduce: " + b);
}
输入以下内容:
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
我得到以下预期结果:
Test NoReduce: -2,-1
并输入以下内容:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
我得到以下预期的输出:
Test NoReduce:
因此,除非我完全误解了什么,否则真正处理零元素Observable的唯一方法是在映射后跟reduce的情况下由过滤器生成的结果是在Observable链之外实现reduce逻辑。你们都同意那句话吗?
最终解决方案
这是实现TomášDvořák和David Motten所建议的最终解决方案。我认为这种解决方案是合理的。
@Test
public void testWithToList()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).toList().map(new Func1<List<Integer>, String>()
{
@Override
public String call(List<Integer> integers)
{
Iterator<Integer> intItr = integers.iterator();
StringBuilder b = new StringBuilder();
while (intItr.hasNext())
{
b.append(intItr.next());
if (intItr.hasNext())
{
b.append(",");
}
}
return b.toString();
}
}).subscribe(new Action1<String>()
{
@Override
public void call(String s)
{
System.out.println("With a toList: " + s);
}
});
}
给定以下输入时,此测试的行为如下。
当给定一个将具有一些值的流通过过滤器时:
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
结果是:
With a toList: -2,-1
当给出没有任何值的流时,将通过过滤器:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
结果是:
With a toList: <empty string>
现在,更新之后,错误非常明显。Reduce
在RxJava中,IllegalArgumentException
如果可观察到的可减少对象完全符合规范(http://reactivex.io/documentation/operators/reduce.html),则将失败并显示。
在函数式编程中,通常有两个泛型运算符将一个集合聚合为一个值,fold
和reduce
。在公认的术语中,fold
采用初始累加器值,而函数采用运行中的累加器和集合中的值并产生另一个累加器值。伪代码示例:
[1, 2, 3, 4].fold(0, (accumulator, value) => accumulator + value)
将以0开头,最后将1,2,3,4加到正在运行的累加器,最后得出10,即这些值的总和。
Reduce非常相似,只是它没有明确地获取初始累加器值,而是将第一个值用作初始累加器,然后累加所有剩余值。如果您正在寻找最小值或最大值,则这很有意义。
[1, 2, 3, 4].reduce((accumulator, value) => min(accumulator, value))
从不同的角度看待折叠和缩小,您可能会fold
在任何时候使用聚合值,即使对于空集合(例如in sum
,0也有意义),reduce
否则将使用(否则minimum
对空集合没有意义,并且reduce
将无法对之进行操作)这样的集合,在您的情况下则抛出异常)。
您正在执行类似的聚合,用逗号将字符串集合散布成一个字符串。那是更加困难的情况。在一个空的集合上可能很有意义(您可能期望一个空的字符串),但是另一方面,如果您从一个空的累加器开始,结果中的逗号将比您预期的多。正确的解决方案是先检查集合是否为空,然后为空集合返回一个后备字符串,或者reduce
对非空集合执行。您可能会观察到,在空的收集情况下,您通常实际上并不想要一个空的字符串,但是“收集为空”之类的东西可能更合适,因此可以进一步确保此解决方案是干净的。
顺便说一句,我在这里使用单词集合,而不是自由观察,仅出于教育目的。另外,在RxJava中,两者fold
和reduce
都称为相同的,reduce
只有该方法的两个版本,一个仅采用一个参数,另外两个采用参数。
至于最后一个问题:您不必离开Observable链。就像David Motten建议的那样,只需使用toList()即可。
.filter(...)
.toList()
.map(listOfValues => listOfValues.intersperse(", "))
如果还没有库函数,那么intersperse
可以在哪里实现reduce
(这是很常见的)。
collection.intersperse(separator) =
if (collection.isEmpty())
""
else
collection.reduce(accumulator, element => accumulator + separator + element)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句