Composable Collectable in Elixir
Collectable
If you are not familiar with collectable, check out Hex Docs for more detailed introduction.
If the functions in Enumerable are about taking values out, then Collectable is about collecting those values into a structure.
While Enumerable is about the act of pulling values out of
something. Collectable is about the act of pushing values into a
something. Emphasis is on the act part. The something itself is
not important. The something might be a list, map, TCP connection or a
file on disk. Elixir’s Enumerable
and Collectable
protocols are an
implementation of this abstraction.
In this article we will understand how to make collectable (the act of pushing something) composable, and why we should care about it. We also address an unique readability issue related to Collectable. And ensure back-pressure, error handling is in check while building it.
Why we need collectable to be composable ?
We need it to be composable for the same reason why we need Enum
and
Stream
module to work with Enumerable. Currently there is no
equivalent modules to compose Collectable values.
Let’s take an example to illustrate this better.
You have infinite stream of some kind of logs. Each line in the stream can be of different level – info, warn, error. Our task is to push logs to somewhere, and this somewhere is unknown at this point. Not only that we need the destination to be configurable, you also need complete control over how it is pushed. You might want to push logs to AWS s3, to a file on disk, push error logs to console and skip remaining logs, or buffer logs before making expensive network calls.
The log stream might look like this:
log_stream =
Stream.repeatedly(fn ->
random_str = Base.encode64(:crypto.strong_rand_bytes(5))
if :rand.uniform_real() < 0.5 do
"warn: #{random_str}"
else
"error: #{random_str}"
end
end)
## `sink` can be aws S3, File on disk, list, string
# Stream.into(log_stream, sink)
How would you model this keeping the flexibility in mind?
Cases like these are the reason why we need collectable to be composable. Hopefully at the end of the post we solve this better using composable collectable.
Push Container
Elixir’s Collectable protocol is an implementation of the idea. To avoid getting lost in the details of the protocol, let’s build our own push container from bottom-up approach. We will get back to Collectable protocol after a while.
So what we we want is something where we can push stuff. We don’t care about implementation details of this something.
Lets represent this aspect using function called push
. We call
push
function with the “where we want to push” as first argument and
“the thing we want to push” as second argument. So if something wants
to be a push container, it only has to implement push(something, anything)
function.
defmodule PushContainer do
# string container
def push(string, value) when is_binary(string) do
string <> value
end
# list container
def push(list, value) when is_list(list) do
list ++ [value]
end
# void container
def push(nil, _) do
nil
end
end
defmodule Example do
import PushContainer
def random_binaries(container, count) do
Enum.reduce(1..count, container, fn _, container ->
bin = :crypto.strong_rand_bytes(5)
push(container, bin)
end)
end
end
import Example
# returns values as a single binary
random_binaries(<<>>, 10) |> IO.inspect(label: "binary")
# returns values as a list of binaries
random_binaries([], 10) |> IO.inspect(label: "list")
# discards the values and returns nil (similar to /dev/null)
random_binaries(nil, 10) |> IO.inspect(label: "nil")
Container here does not have to be a data structure, It can be a
process, gen-server, socket or file. While data structure only need
push
function to work, for resources we need a way to initialise and
release. For example, to create a file container, first step
(initialise) to open it, then push values and at the end close it
(release). So let’s add support to these stages. While at it lets
group all these 3 functions into a module so it is easier to build
container for different type.
defmodule PushContainer do
defstruct [:acc, :init, :func, :finish]
@type acc :: any
@spec build((() -> acc), (any, acc -> acc), (acc -> any)) :: PushContainer.t()
def build(init, func, finish) do
%__MODULE__{init: init, func: func, finish: finish}
end
@spec init(PushContainer.t()) :: PushContainer.t()
def init(%PushContainer{init: init} = pc) do
%PushContainer{pc | acc: init.(), init: :done}
end
@spec push(PushContainer.t(), any) :: PushContainer.t()
def push(%PushContainer{acc: acc, func: func} = pc, value) do
%PushContainer{pc | acc: func.(value, acc)}
end
@spec push(PushContainer.t()) :: PushContainer.t()
def finish(%PushContainer{acc: acc, finish: finish}) do
finish.(acc)
end
end
# change exmaple to adopt this new interface
defmodule Example do
def random_binaries(%PushContainer{} = pc, count) do
pc = PushContainer.init(pc)
pc =
Enum.reduce(1..count, pc, fn _, pc ->
bin = :crypto.strong_rand_bytes(5)
PushContainer.push(pc, bin)
end)
PushContainer.finish(pc)
end
end
Lets build few push containers using new interface:
binary_pc =
PushContainer.build(
fn -> <<>> end,
fn val, acc -> acc <> val end,
fn acc -> acc end
)
list_pc =
PushContainer.build(
fn -> [] end,
fn val, acc -> acc ++ [val] end,
fn acc -> acc end
)
file_pc =
PushContainer.build(
fn -> File.open!("somefile.txt", [:write, :binary]) end,
fn val, file ->
:ok = IO.binwrite(file, val)
file
end,
fn file -> File.close(file) end
)
import Example
random_binaries(binary_pc, 10) |> IO.inspect(label: :binary)
random_binaries(list_pc, 10) |> IO.inspect(label: :list)
:ok = random_binaries(file_pc, 10)
File.read!("somefile.txt")
Now that we are familiar with push containers as building blocks. Let’s head back to composability aspect.
Composing push containers
In the above example the function call random_binaries(..., 1000)
is
pushing binaries to the container. Now say we want to encode the
binaries using base64 before we push. How would we support this?
As before, let’s create a function called map
which wraps the
original push
for the container with a given function and return it
as a new push container. It is same as Enum.map
but for push
container.
defmodule PushContainerHelpers do
@spec map(PushContainer.t(), (any -> any)) :: PushContainer.t()
def map(%PushContainer{} = inner_pc, func) do
PushContainer.build(
fn -> PushContainer.init(inner_pc) end,
fn elem, inner_pc -> PushContainer.push(inner_pc, func.(elem)) end,
fn inner_pc -> PushContainer.finish(inner_pc) end
)
end
end
Notice that map
takes a push container and returns a push container.
import PushContainerHelpers
encoded =
random_binaries(
map(binary_pc, &Base.encode64/1),
10
)
IO.puts("Base64 encoded binaries: #{strinencoded_binaries}")
# string -> encode64 -> upcase -> collect
upcased_encoded =
random_binaries(
map(
map(
binary_pc,
&String.upcase/1
),
&Base.encode64/1
),
10
)
IO.puts("Base64 encoded binaries in upcase: #{upcased_encoded_binaries}")
We can compose push container like this by plugging functions dynamically and building new push containers on-the-fly. This works but as you can see in the last example it not easy to read. Let’s see if it improves if we format it differently or if we use pipe.
Readability
Does formatting improves the readability in the above examples?
import PushContainerHelpers
# string -> encode64 -> upcase -> collect-as-string
# formatting as single line
random_binaries(map(map(binary_pc, &String.upcase/1), &Base.encode64/1), 10)
# formatting using pipe
binary_pc
|> map(&String.upcase/1)
|> map(&Base.encode64/1)
|> random_binaries(10)
As you can see formatting is not helping. Using our beloved pipe makes reading even worse! We have to read the expressions from bottom-to-top to understand what is going on. We can visualise it to make the issue more clear.
Sort of works for small chains, but we still need to address it for long expressions.
Using pipes does not work at all with push containers!
What is going on here?
The issue is that we prefer to read an expression as INPUT, FUNCTION, OUTPUT
in that exact order. This is especially important when we have
nested function calls with multiple parameters. Why we like
expressions to be in this order? I don’t know for sure but my guess is
that, it is so because that’s how most programming languages are
modelled. We are trained to read code like that and expect
order-of-reading with logical order-of-execution. It also might be
because most our languages are written from left-to-right,
top-to-bottom (there are few exceptions to this). Or more
fundamentally the natural order of cause and effect. If order of an
expression is not INPUT, FUNCTION, OUTPUT
then that will counter
intuitive. This is also the reason why we have pipe operator |>
in
the first place. Pipe is basically a macro-magic we use to make
function chaining more readable by changing the order expressions.
# Enum functions without pipe:
#
# Enum.map(...)
# FUNCTION, INPUT
#
# OUPUT is implicit here as return value
Enum.map(Enum.map(random_binaries(10), &String.upcase/1), &Base.encode64/1)
# Enum functions with pipe
#
# ... |> Enum.map()
# INPUT, FUNCTION
random_binaries(10)
|> Enum.map(&String.upcase/1)
|> Enum.map(&Base.encode64/1)
Pipe works for Enum because first argument we pass to the Enum
function is the “input”. So moving input expression a line before
the function call makes the movement of values appear natural.
It is more obvious if we visualise this:
As we can see, using pipe with when multiple functions are involved is
pleasant to read. By using pipe we turned FUNCTION INPUT
into
INPUT FUNCTION
which matches perfectly with how we read code.
Why pipe is not working for push container?
# Without pipe:
#
# FUNCTION, OUTPUT, INPUT
random_binaries(binary_pc, 10)
# With pipe:
#
# OUTPUT, FUNCTION, INPUT
binary_pc
|> random_binaries(10)
The “push container” value (binary_pc
in the example) we are passing
is not an input we pass to the operation in logical sense. It is
something where the function should push the output. So using pipe
here breaks our expectation and looks confusing.
How to solve this?
Let’s correct the order FUNCTION, OUTPUT, INPUT
to FUNCTION, INPUT, OUTPUT
by making push-container (binary_pc
) value as last argument
to the function.
# formatting as single line
random_binaries(10, map(&Base.encode64/1, map(&String.upcase/1, binary_pc)))
# formatting as multiple lines
random_binaries(
10,
map(
&Base.encode64/1,
map(
&String.upcase/1,
binary_pc
)
)
)
When visualise this:
This reads much better and nearly addresses our concern. Although
formatting function nested function calls as multiple lines make it
more readable, we could go one step further and create a new operator
like |>
for cases like this. Our new operator takes two expressions
A and B, and appends B as last argument to A. One important thing to
note is that the operator must be right associative for this to
work. Why it won’t work otherwise is left as exercise to readers :)
Let’s pick +++
operator for demonstration purpose which is right
associate.
defmodule PushOp do
defmacro left +++ right do
{func, meta, args} = left
{func, meta, args ++ [unq]}
end
end
require PushOp
import PushOp
string =
random_binaries(10)
+++ map(&Base.encode64/1)
+++ map(&String.upcase/1)
+++ binary_pc
IO.puts("=> #{string}")
This is even better than previous code, since we got back to our
INPUT, FUNCTION, OUTPUT
ordering. Exactly like how pipe operator
worked for Enum. To recap, all we did was to make collectable param as
last param and created helper macro to improve the ordering of the
expression.
Now that readability side quest is complete, lets get back to the main mission.
Remaining push container functions
Implementing other basic helper functions like reduce, filter etc is
straight forward. Let’s bundle all functions as Contr
module. Similar to Enum
module for Enumerable
.
defmodule Contr do
def reduce(func, %PushContainer{} = pc) do
PushContainer.new(
fn ->
PushContainer.init(pc)
end,
fn elem, pc ->
func.(elem, pc)
end,
fn pc ->
PushContainer.finish(pc)
end
)
end
# redefining map using reduce
def map(func, %PushContainer{} = pc) do
reduce(
fn elem, pc ->
PushContainer.call(pc, func.(elem))
end,
pc
)
end
def filter(func, %PushContainer{} = pc) do
reduce(
fn elem, pc ->
if func.(elem) do
PushContainer.call(pc, elem)
else
pc
end
end,
pc
)
end
def split(func, %PushContainer{} = true_pc, %PushContainer{} = false_pc) do
PushContainer.new(
fn ->
{PushContainer.init(true_pc), PushContainer.init(false_pc)}
end,
fn elem, {t, f} ->
if func.(elem) do
{PushContainer.push(t, elem), f}
else
{t, PushContainer.push(f, elem)}
end
end,
fn {t, f} ->
{PushContainer.finish(t), PushContainer.finish(f)}
end
)
end
end
Equipped with these, we can compose push containers in multiple ways similar to Enum.
import Example
import PushOp
_string =
random_binaries(10)
+++ Contr.map(fn bin -> Base.encode64(bin) end)
+++ Contr.filter(fn str -> str =~ ~r/^[a-zA-Z=]+$/ end)
+++ binary_pc
# => "olHQItQ=OWPYlAY="
Only thing pending now is to bridge the PushContainer
and
Collectable
protocol for interoperability, so we can take advantage
of modules which already implements Collectable protocol.
defmodule PushContainer do
defstruct [:acc, :init, :func, :finish]
def build(init, func, finish) do
%__MODULE__{init: init, func: func, finish: finish}
end
def init(%PushContainer{init: init} = pc) do
%PushContainer{pc | acc: init.(), init: :done}
end
def push(%PushContainer{acc: acc, func: func} = pc, value) do
%PushContainer{pc | acc: func.(value, acc)}
end
def finish(%PushContainer{acc: acc, finish: finish}, status) do
if is_function(finish, 1) do
finish.(acc)
else
finish.(acc, status)
end
end
def from_collectable(collectable) do
PushContainer.build(
fn ->
Collectable.into(collectable)
end,
fn {acc, collector_func}, elem ->
collector_func.(acc, {:cont, elem})
end,
fn {acc, collector_func}, status ->
collector_func.(acc, status)
end
)
end
end
defimpl Collectable, for: PushContainer do
def into(%PushContainer{} = pc) do
collector_func = fn
acc, {:cont, elem} ->
PushContainer.push(acc, elem)
acc, :halt ->
PushContainer.finish(acc, :halt)
acc, :done ->
PushContainer.finish(acc, :done)
end
acc = PushContainer.init(pc)
{acc, collector_func}
end
end
import Example
import PushOp, only: [{:+++, 2}]
square_set_container =
Contr.map(
fn num -> num * num end,
PushContainer.from_collectable(MapSet.new())
)
square_set = Enum.into(1..10, square_set_container)
# => #MapSet<[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]>
As for the original sample problem about stream of logs:
import Example
import PushOp, only: [{:+++, 2}]
defmodule Demo do
def collect_logs(sink) do
log_stream
|> Stream.take(10)
|> Enum.into(sink)
end
end
error_sink =
Contr.map(fn "error: " <> line -> line end)
+++ Contr.map(fn line -> String.upcase(line) end)
+++ list_pc
warn_sink =
Contr.map(fn "warn: " <> line -> line end)
+++ Contr.map(fn line -> line <> "\n" end)
+++ binary_pc
Demo.collect_logs(
Contr.split(
&String.starts_with?(&1, "warn: "),
warn_sink,
error_sink
)
)
To read more about streams in Elixir: