Scala中的惰性分页(迭代器的流/迭代器?)

亚历克斯

我通过调用依次一次从数据库API逐页读取大量记录(每页记录数未知) def readPage(pageNumber: Int): Iterator[Record]

我正在尝试以一种功能性的方式(或者理想情况下,没有可变的状态)以某种方式Stream[Iterator[Record]]或以Iterator[Iterator[Record]]惰性方式包装此API ,并保持不变的内存占用,以便将其视为无限的页面流或迭代器序列,并将其抽象化来自客户的分页。客户端可以通过调用next()来迭代结果,它将检索下一页(Iterator [Record])。

在Scala中实现此功能的最惯用和有效的方法是什么。

编辑:需要一次一页提取和处理记录,不能从内存中的所有页面维护所有记录。如果一页失败,则引发异常。大量页面/记录意味着对所有实际目的都是无限的。我想将其视为页面的无限流(或迭代器),而每个页面都是有限数量的记录的迭代器(例如,小于1000,但如果有时间,则确切数量未知)。

我在Monix查看了BatchCursor,但它具有不同的用途。

编辑2:这是当前版本,使用下面Tomer的答案作为起点,但使用Stream代替Iterator。这样就可以消除按照https://stackoverflow.com/a/10525539/165130进行尾部递归的需要,并且有O(1)的时间进行流前置#::操作(而如果我们通过++操作将迭代器串联起来,则它为O (n))

注意:虽然对流进行延迟评估,但是流备忘可能仍会导致内存崩溃,并且内存管理变得棘手从更改valdef以定义def pages = readAllPages下面的Stream似乎没有任何效果

def readAllPages(pageNumber: Int = 0): Stream[Iterator[Record]] = {
   val iter: Iterator[Record] = readPage(pageNumber)
   if (iter.isEmpty)
     Stream.empty
   else
    iter #:: readAllPages(pageNumber + 1)
} 
      
//usage
val pages = readAllPages
for{
    page<-pages
    record<-page
    if(isValid(record))
}
process(record)
 

编辑3:Tomer的第二个建议似乎是最好的,它的运行时间和内存占用量与上述解决方案相似,但更简洁,更容易出错。

val pages = Stream.from(1).map(readPage).takeWhile(_.nonEmpty)

注意:Stream.from(1)API文档中创建一个从1开始并以1递增的流。

托玛谢塔

您可以尝试实现以下逻辑:

def readPage(pageNumber: Int): Iterator[Record] = ???

@tailrec
def readAllPages(pageNumber: Int): Iterator[Iterator[Record]] = {
  val iter = readPage(pageNumber)
  if (iter.nonEmpty) {
    // Compute on records
    // When finishing computing:
    Iterator(iter) ++ readAllPages(pageNumber + 1)
  } else {
    Iterator.empty
  }
}

readAllPages(0)

一个简短的选择是:

Stream.from(1).map(readPage).takeWhile(_.nonEmpty)

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章