我通过调用依次一次从数据库API逐页读取大量记录(每页记录数未知) def readPage(pageNumber: Int): Iterator[Record]
我正在尝试以一种功能性的方式(或者理想情况下,没有可变的状态)以某种方式Stream[Iterator[Record]]
或以Iterator[Iterator[Record]]
惰性方式包装此API ,并保持不变的内存占用,以便将其视为无限的页面流或迭代器序列,并将其抽象化来自客户的分页。客户端可以通过调用next()来迭代结果,它将检索下一页(Iterator [Record])。
在Scala中实现此功能的最惯用和有效的方法是什么。
编辑:需要一次一页提取和处理记录,不能从内存中的所有页面维护所有记录。如果一页失败,则引发异常。大量页面/记录意味着对所有实际目的都是无限的。我想将其视为页面的无限流(或迭代器),而每个页面都是有限数量的记录的迭代器(例如,小于1000,但如果有时间,则确切数量未知)。
我在Monix中查看了BatchCursor,但它具有不同的用途。
编辑2:这是当前版本,使用下面Tomer的答案作为起点,但使用Stream代替Iterator。这样就可以消除按照https://stackoverflow.com/a/10525539/165130进行尾部递归的需要,并且有O(1)的时间进行流前置#::
操作(而如果我们通过++
操作将迭代器串联起来,则它为O (n))
注意:虽然对流进行延迟评估,但是流备忘可能仍会导致内存崩溃,并且内存管理变得棘手。从更改val
为def
以定义def pages = readAllPages
下面的Stream似乎没有任何效果
def readAllPages(pageNumber: Int = 0): Stream[Iterator[Record]] = {
val iter: Iterator[Record] = readPage(pageNumber)
if (iter.isEmpty)
Stream.empty
else
iter #:: readAllPages(pageNumber + 1)
}
//usage
val pages = readAllPages
for{
page<-pages
record<-page
if(isValid(record))
}
process(record)
编辑3:Tomer的第二个建议似乎是最好的,它的运行时间和内存占用量与上述解决方案相似,但更简洁,更容易出错。
val pages = Stream.from(1).map(readPage).takeWhile(_.nonEmpty)
注意:Stream.from(1)
在API文档中创建一个从1开始并以1递增的流。
您可以尝试实现以下逻辑:
def readPage(pageNumber: Int): Iterator[Record] = ???
@tailrec
def readAllPages(pageNumber: Int): Iterator[Iterator[Record]] = {
val iter = readPage(pageNumber)
if (iter.nonEmpty) {
// Compute on records
// When finishing computing:
Iterator(iter) ++ readAllPages(pageNumber + 1)
} else {
Iterator.empty
}
}
readAllPages(0)
一个简短的选择是:
Stream.from(1).map(readPage).takeWhile(_.nonEmpty)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句