我开始学习Elixir,遇到了我无法轻松解决的挑战。
我正在尝试创建一个函数,该函数接受一个Enumerable.t并返回另一个包含下n个项目的Enumerable.t 。它的行为与Enum.chunk(e,n,1,[])略有不同,因为数字迭代计数将始终等于原始可枚举计数。我还需要支持Streams
@spec lookahead(Enumerable.t, non_neg_integer) :: Enumerable.t
最好用doctest语法说明:
iex> lookahead(1..6, 1) |> Enum.to_list
[[1,2],[2,3],[3,4],[4,5],[5,6],[6]]
iex> lookahead(1..4, 2) |> Enum.to_list
[[1,2,3],[2,3,4],[3,4],[4]]
iex> Stream.cycle(1..4) |> lookahead(2) |> Enum.take(5)
[[1,2,3],[2,3,4],[3,4,1],[4,1,2],[1,2,3]]
iex> {:ok,io} = StringIO.open("abcd")
iex> IO.stream(io,1) |> lookahead(2) |> Enum.to_list
[["a","b","c"],["b","c","d"],["c","d"],["d"]]
我已经研究了如何实现Enumerable.t协议,但还不太了解Enumerable.reduce接口。
是否有任何简洁/优雅的方法?
我的用例是在二进制流上使用较小的固定n值(1或2),因此对于优化版本来说要加分。但是,出于学习Elixir的目的,我对跨多个用例的解决方案感兴趣。性能很重要。我将针对解决方案的n的各个值运行一些基准测试并发布。
基准更新-2015年4月8日
已发布6个可行的解决方案。有关基准测试的详细信息,请参见https://gist.github.com/spitsw/fce5304ec6941578e454。基准针对包含500个项目的n值运行。
对于n = 1,结果如下:
PatrickSuspend.lookahead 104.90 µs/op
Warren.lookahead 174.00 µs/op
PatrickChunk.lookahead 310.60 µs/op
PatrickTransform.lookahead 357.00 µs/op
Jose.lookahead 647.60 µs/op
PatrickUnfold.lookahead 1484000.00 µs/op
对于n = 50,以下结果:
PatrickSuspend.lookahead 220.80 µs/op
Warren.lookahead 320.60 µs/op
PatrickTransform.lookahead 518.60 µs/op
Jose.lookahead 1390.00 µs/op
PatrickChunk.lookahead 3058.00 µs/op
PatrickUnfold.lookahead 1345000.00 µs/op (faster than n=1)
如评论中所述,我的第一次尝试遇到了一些性能问题,并且不适用于具有副作用的流,例如IO流。我花了一些时间深入研究流库,最后想出了以下解决方案:
defmodule MyStream
def lookahead(enum, n) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_lookahead(n, :buffer, [], next, &1, &2)
end
# stream suspended
defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do
{:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)}
end
# stream halted
defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do
{:halted, acc}
end
# initial buffering
defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
new_state = if length(buf) < n, do: :buffer, else: :emit
do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun)
{_, _} ->
do_lookahead(n, :emit, buf, next, {:cont, acc}, fun)
end
end
# emitting
defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun)
{_, _} ->
do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun)
end
end
# buffer empty, halting
defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do
{:halted, acc}
end
end
乍一看这可能令人生畏,但实际上并不难。我会尽力为您分解它,但是对于像这样的完整示例来说,这很难。
让我们从一个更简单的示例开始:一个不断重复赋予它的值的流。为了发出流,我们可以返回一个以累加器和一个函数为参数的函数。要发出一个值,我们用两个参数调用该函数:要发出的值和累加器。acc
累加器是由一个命令(一个元组:cont
,:suspend
或:halt
),并告诉我们消费者想要我们做的; 我们需要返回的结果取决于操作。如果应该暂停流,我们将返回一个原子的三元素元组:suspended
,累加器和一个在枚举继续时将被调用的函数(有时称为“ continuation”)。对于:halt
命令,我们只需返回{:halted, acc}
,对于:cont
我们通过执行如上所述的递归步骤来发出一个值。整个事情看起来像这样:
defmodule MyStream do
def repeat(val) do
&do_repeat(val, &1, &2)
end
defp do_repeat(val, {:suspend, acc}, fun) do
{:suspended, acc, &do_repeat(val, &1, fun)}
end
defp do_repeat(_val, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_repeat(val, {:cont, acc}, fun) do
do_repeat(val, fun.(val, acc), fun)
end
end
现在,这只是难题的一部分。我们可以发出流,但是还没有处理传入的流。同样,为了解释它是如何工作的,构造一个简单的例子是有意义的。在这里,我将构建一个函数,该函数采用可枚举的值,只是为每个值挂起并重新发出。
defmodule MyStream do
def passthrough(enum) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_passthrough(next, &1, &2)
end
defp do_passthrough(next, {:suspend, acc}, fun) do
{:suspended, acc, &do_passthrough(next, &1, fun)}
end
defp do_passthrough(_next, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_passthrough(next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_passthrough(next, fun.(val, acc), fun)
{_, _} ->
{:halted, acc}
end
end
end
第一个子句设置next
传递给do_passthrough
函数的函数。它用于从传入流中获取下一个值的目的。内部使用的step函数定义我们暂停流中的每个项目。其余部分与最后一个子句非常相似。在这里,我们调用next函数{:cont, []}
以获取新值,并通过case语句处理结果。如果有值,我们返回{:suspended, val, next}
,否则返回,流停止,然后将其传递给使用者。
我希望可以澄清一些有关如何在Elixir中手动构建流的内容。不幸的是,使用流需要大量样板。如果lookahead
现在返回到实现,您将看到只有微小的差异,它们是真正有趣的部分。有两个附加参数:state
,用于区分:buffer
和:emit
步骤,并在初始缓冲步骤中buffer
预先填充了n+1
项目。在发射阶段,当前缓冲区被发射,然后在每次迭代时向左移动。当输入流停止或我们的流直接停止时,我们就完成了。
我将原始答案留在这里以供参考:
这是一种Stream.unfold/2
根据您的规范发出真实值流的解决方案。这意味着您需要Enum.to_list
在前两个示例的末尾添加以获得实际值。
defmodule MyStream do
def lookahead(stream, n) do
Stream.unfold split(stream, n+1), fn
{[], stream} ->
nil
{[_ | buf] = current, stream} ->
{value, stream} = split(stream, 1)
{current, {buf ++ value, stream}}
end
end
defp split(stream, n) do
{Enum.take(stream, n), Stream.drop(stream, n)}
end
end
一般的想法是我们保留先前迭代的buf。在每次迭代中,我们发出当前buf,从流中获取一个值并将其附加到buf的末尾。重复此操作,直到buf为空。
例:
iex> MyStream.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]
iex> MyStream.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4], [4]]
iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5)
[[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]]
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句